AMQ Streams on OpenShift の使用
OpenShift Container Platform 上で AMQ Streams 1.4 を使用
概要
第1章 AMQ Streams の概要
AMQ Streams は、OpenShift クラスターで Apache Kafka を実行するプロセスを簡素化します。
1.1. Kafka の機能
Kafka の基盤のデータストリーム処理機能とコンポーネントアーキテクチャーによって以下が提供されます。
- スループットが非常に高く、レイテンシーが低い状態でデータを共有するマイクロサービスおよびその他のアプリケーション。
- メッセージの順序の保証。
- アプリケーションの状態を再構築するためにデータストレージからメッセージを巻き戻し/再生。
- キーバリューログの使用時に古いレコードを削除するメッセージ圧縮。
- クラスター設定での水平スケーラビリティー。
- 耐障害性を制御するデータのレプリケーション。
- 即座にアクセスするために大容量のデータを保持。
1.2. Kafka のユースケース
Kafka の機能は、以下に適しています。
- イベント駆動型のアーキテクチャー。
- アプリケーションの状態変更をイベントのログとしてキャプチャーするイベントソーシング。
- メッセージのブローカー。
- Web サイトアクティビティーの追跡。
- メトリクスによるオペレーションの監視。
- ログの収集および集計。
- 分散システムのログのコミット。
- アプリケーションがリアルタイムでデータに対応できるようにするストリーム処理。
1.3. AMQ Streams による Kafka のサポート
AMQ Streams は、Kafka を OpenShift で実行するためのコンテナーイメージおよび Operator を提供します。AMQ Streams Operator は、AMQ Streams の実行に必要です。AMQ Streams で提供される Operator は、Kafka を効果的に管理するために、専門的なオペレーション情報で目的に合うよう構築されています。
Operator は以下のプロセスを単純化します。
- Kafka クラスターのデプロイおよび実行。
- Kafka コンポーネントのデプロイおよび実行。
- Kafka へアクセスするための設定。
- Kafka へのアクセスをセキュア化。
- Kafka のアップグレード。
- ブローカーの管理。
- トピックの作成および管理。
- ユーザーの作成および管理。
1.4. Operator
AMQ Streams は、OpenShift クラスター内で実行中の Kafka クラスターを管理するための Operator を提供します。
- Cluster Operator
- Apache Kafka クラスター、Kafka Connect、Kafka MirrorMaker、Kafka Bridge、Kafka Exporter、および Entity Operator をデプロイおよび管理します。
- Entitiy Operator
- Topic Operator および User Operator を構成します。
- Topic Operator
- Kafka トピックを管理します。
- User Operator
- Kafka ユーザーを管理します。
Cluster Operator は、Kafka クラスターと同時に、Topic Operator および User Operator を Entity Operator 設定の一部としてデプロイできます。
AMQ Streams アーキテクチャー内の Operator
1.5. AMQ Streams のインストール方法
AMQ Streams を OpenShift にインストールする方法は 2 つあります。
インストール方法 | 説明 | サポート対象バージョン |
---|---|---|
インストールアーティファクト (YAML ファイル) |
AMQ Streams のダウンロードサイト から | OpenShift 3.11 以上 |
OperatorHub | OperatorHub で AMQ Streams Operator を使用し、Cluster Operator を単一またはすべての namespace にデプロイします。 | OpenShift 4.x のみ |
柔軟性が重要な場合は、インストールアーティファクトによる方法を選択します。OpenShift 4 Web コンソールを使用して標準設定で AMQ Streams を OpenShift 4 にインストールする場合は、OperatorHub による方法を選択します。OperatorHub を使用すると、自動更新も利用できます。
どちらの方法でも、Cluster Operator は OpenShift クラスターにデプロイされ、提供される YAML サンプルファイルを使用して、Kafka クラスターから順に AMQ Streams の他のコンポーネントをデプロイする準備が整います。
AMQ Streams インストールアーティファクト
AMQ Streams インストールアーティファクトには、OpenShift にデプロイできるさまざまな YAML ファイルが含まれ、oc
使用して以下を含むカスタムリソースが作成されます。
- デプロイメント
- Custom Resource Definition (CRD)
- ロールおよびロールバインディング
- サービスアカウント
YAML インストールファイルは、Cluster Operator、Topic Operator、User Operator、および Strimzi Admin ロールに提供されます。
OperatorHub
OpenShift 4 では、Operator Lifecycle Manager (OLM) を使用することにより、クラスター管理者はクラスター全体で実行されるすべての Operator やそれらの関連サービスをインストール、更新、および管理できます。OLM は、Kubernetes のネイティブアプリケーション (Operator) を効率的に自動化された拡張可能な方法で管理するために設計されたオープンソースツールキットの Operator Framework の一部です。
OperatorHub は OpenShift 4 Web コンソールの一部です。クラスター管理者はこれを使用して Operator を検出、インストール、およびアップグレードできます。Operator は OperatorHub からプルでき、単一の (プロジェクト) namespace またはすべての (プロジェクト) namespace への OpenShift クラスターにインストールできます。Operator は OLM で管理できます。エンジニアリングチームは OLM を使用して、独自に開発、テスト、および本番環境でソフトウェアを管理できます。
OperatorHub は、バージョン 4 未満の OpenShift では使用できません。
AMQ Streams Operator
AMQ Streams Operator は OperatorHub からインストールできます。AMQ Streams Operator のインストール後、必要な CRD およびロールベースアクセス制御 (RBAC) リソースと共に Cluster Operator が OpenShift クラスターにデプロイされます。
その他のリソース
インストールアーティファクトを使用した AMQ Streams のインストール:
OperatorHub からの AMQ Streams のインストール:
- 「OperatorHub からの Cluster Operator のデプロイ」
- OpenShift ドキュメントの『Operator』
1.6. 本書の表記慣例
置き換え可能なテキスト
本書では、置き換え可能なテキストは等幅フォントおよびイタリック体で記載されています。
たとえば、以下のコードでは my-namespace
を namespace の名前に置き換えます。
sed -i 's/namespace: .*/namespace: my-namespace/' install/cluster-operator/*RoleBinding*.yaml
第2章 AMQ Streams の使用
AMQ Streams は、パブリックおよびプライベートクラウドからデプロイメントを目的とするローカルデプロイメントまで、ディストリビューションに関係なくすべてのタイプの OpenShift クラスターで動作するよう設計されています。AMQ Streams は、OpenShift 固有の一部機能をサポートします。そのようなインテグレーションは OpenShift ユーザーに有用で、標準の OpenShift を使用して同様に実装することはできません。
本ガイドでは、OpenShift クラスターが使用できることを仮定し、さらに oc
コマンドラインツールがインストールされ、稼働中のクラスターに接続するように設定されていることを仮定しています。
AMQ Streams は Strimzi 0.17.x をベースとしています。本章では、OpenShift 3.11 以降に AMQ Streams をデプロイする方法を説明します。
本ガイドのコマンドを実行するには、クラスターユーザーに RBAC (ロールベースアクセス制御) および CRD を管理する権限を付与する必要があります。
2.1. AMQ Streams のインストールおよびコンポーネントのデプロイ
AMQ Streams をインストールするには、AMQ Streams のダウンロードページ から amq-streams-x.y.z-ocp-install-examples.zip
ファイルをダウンロードし、展開します。
フォルダーには、複数の YAML ファイルが含まれています。これらのファイルは、AMQ Streams のコンポーネントを OpenShift にデプロイするのに役立ち、共通の操作を実行し、Kafka クラスターを設定します。YAML ファイルは本書を通して参照されます。
本章の後半では、提供される YAML ファイルを使用してコンポーネントを OpenShift にデプロイするための各コンポーネントおよび手順の概要を取り上げます。
AMQ Streams のコンテナーイメージは Red Hat Container Catalog で使用できますが、この代わりに提供される YAML ファイルを使用することが推奨されます。
2.2. カスタムリソース
カスタムリソースを使用すると、デフォルトの AMQ Streams デプロイメントを設定し、変更を追加することができます。カスタムリソースを使用するには、最初にカスタムリソース定義を指定する必要があります。
カスタムリソース定義 (CRD) は Kubernetes API を拡張し、カスタムリソースを OpenShift クラスターに追加する定義を提供します。カスタムリソースは、CRD によって追加される API のインスタンスとして作成されます。
AMQ Streams では、Kafka、Kafka Connect、Kafka MirrorMaker 、ユーザーおよびトピックカスタムリソースなどの AMQ Streams に固有のカスタムリソースが CRD によって OpenShift クラスターに導入されます。CRD によって設定手順が提供され、AMQ Streams 固有のリソースをインスタンス化および管理するために使用されるスキーマが定義されます。また、CRD によって、CLI へのアクセスや設定検証などのネイティブ OpenShift 機能を AMQ Streams リソースで活用することもできます。
CRD はクラスターで 1 度インストールする必要があります。クラスターの設定によりますが、インストールには通常、クラスター管理者権限が必要です。
カスタムリソースの管理は、 AMQ Streams 管理者 のみが行えます。
CRD およびカスタムリソースは YAML ファイルとして定義されます。
kind:Kafka
などの新しい kind
リソースは、OpenShift クラスター内で CRD によって定義されます。
Kubernetes API サーバーを使用すると、kind
を基にしたカスタムリソースの作成が可能になり、カスタムリソースが OpenShift クラスターに追加されたときにカスタムリソースの検証および格納方法を CRD から判断します。
CRD が削除されると、そのタイプのカスタムタイプも削除されます。さらに、Pod や Statefulset などのカスタムリソースによって作成されたリソースも削除されます。
2.2.1. AMQ Streams カスタムリソースの例
AMQ Streams 固有の各カスタムリソースは、リソースの kind
の CRD によって定義されるスキーマに準拠します。
CRD とカスタムリソースの関係を理解するため、Kafka トピックの CRD の例を見てみましょう。
Kafka トピックの CRD
apiVersion: kafka.strimzi.io/v1beta1 kind: CustomResourceDefinition metadata: 1 name: kafkatopics.kafka.strimzi.io labels: app: strimzi spec: 2 group: kafka.strimzi.io versions: v1beta1 scope: Namespaced names: # ... singular: kafkatopic plural: kafkatopics shortNames: - kt 3 additionalPrinterColumns: 4 # ... subresources: status: {} 5 validation: 6 openAPIV3Schema: properties: spec: type: object properties: partitions: type: integer minimum: 1 replicas: type: integer minimum: 1 maximum: 32767 # ...
- 1
- CRD を識別するためのトピック CRD、その名前および名前のメタデータ。
- 2
- この CRD に指定された項目には、トピックの API にアクセスするため URL に使用されるグルShortNameープ (ドメイン) 名、複数名、およびサポートされるスキーマバージョンが含まれます。他の名前は、CLI のインスタンスリソースを識別するために使用されます。例:
oc get kafkatopic my-topic
またはoc get kafkatopics
- 3
- ShortName は CLI コマンドで使用できます。たとえば、
oc get kafkatopic
の代わりにoc get kt
を略名として使用できます。 - 4
- カスタムリソースで
get
コマンドを使用する場合に示される情報。 - 5
- リソースの スキーマ参照 に記載されている CRD の現在のステータス。
- 6
- openAPIV3Schema 検証によって、トピックカスタムリソースの作成が検証されます。たとえば、トピックには 1 つ以上のパーティションと 1 つのレプリカが必要です。
ファイル名に、インデックス番号とそれに続く「Crd」が含まれるため、AMQ Streams インストールファイルと提供される CRD YAML ファイルを識別できます。
KafkaTopic
カスタムリソースに該当する例は次のとおりです。
Kafka トピックカスタムリソース
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaTopic 1 metadata: name: my-topic labels: strimzi.io/cluster: my-cluster 2 spec: 3 partitions: 1 replicas: 1 config: retention.ms: 7200000 segment.bytes: 1073741824 status: conditions: 4 lastTransitionTime: "2019-08-20T11:37:00.706Z" status: "True" type: Ready observedGeneration: 1 / ...
- 1
kind
およびapiVersion
によって、インスタンスであるカスタムリソースの CRD が特定されます。- 2
- トピックまたはユーザーが属する Kafka クラスターの名前 (
Kafka
リソースの名前と同じ) を定義する、KafkaTopic
およびKafkaUser
リソースのみに適用可能なラベル。この名前は、トピックまたはユーザーの作成時に Kafka クラスターを識別するために Topic Operator および User Operator によって使用されます。
- 3
- 指定内容には、トピックのパーティション数およびレプリカ数や、トピック自体の設定パラメーターが示されています。この例では、メッセージがトピックに保持される期間や、ログのセグメントファイルサイズが指定されています。
- 4
KafkaTopic
リソースのステータス条件。lastTransitionTime
でtype
条件がReady
に変更されています。
プラットフォーム CLI からカスタムリソースをクラスターに適用できます。カスタムリソースが作成されると、Kubernetes API の組み込みリソースと同じ検証が使用されます。
KafkaTopic
の作成後、Topic Operator は通知を受け取り、該当する Kafka トピックが AMQ Streams で作成されます。
2.2.2. AMQ Streams カスタムリソースのステータス
AMQ Streams カスタムリソースの status
プロパティーは、リソースに関する情報を必要とするユーザーおよびツールにその情報をパブリッシュします。
下記の表のとおり、複数のリソースに status
プロパティーがあります。
AMQ Streams リソース | スキーマ参照 | ステータス情報がパブリッシュされる場所 |
---|---|---|
| Kafka クラスター | |
| デプロイされている場合は Kafka Connect クラスター。 | |
| デプロイされている場合は Source-to-Image (S2I) サポートのある Kafka Connect クラスター。 | |
|
デプロイされている場合は | |
| デプロイされている場合は Kafka MirrorMakerツール。 | |
| Kafka クラスターの Kafka トピック | |
| Kafka クラスターの Kafka ユーザー。 | |
| デプロイされている場合は AMQ Streams の Kafka Bridge。 |
リソースの status
プロパティーによって、リソースの下記項目の情報が提供されます。
-
status.conditions
プロパティーの Current state (現在の状態)。 -
status.observedGeneration
プロパティーの Last observed generation (最後に確認された生成)。
status
プロパティーによって、リソース固有の情報も提供されます。以下に例を示します。
-
KafkaConnectStatus
によって、Kafka Connect コネクターの REST API エンドポイントが提供されます。 -
KafkaUserStatus
によって、Kafka ユーザーの名前と、ユーザーのクレデンシャルが保存されるSecret
が提供されます。 -
KafkaBridgeStatus
によって、外部クライアントアプリケーションが Bridge サービスにアクセスできる HTTP アドレスが提供されます。
リソースの Current state (現在の状態) は、spec
プロパティーによって定義される Desired state (望ましい状態) を実現するリソースに関する進捗を追跡するのに便利です。ステータス条件によって、リソースの状態が変更された時間および理由が提供され、Operator によるリソースの望ましい状態の実現を妨げたり遅らせたりしたイベントの詳細が提供されます。
Last observed generation (最後に確認された生成) は、Cluster Operator によって最後に照合されたリソースの生成です。observedGeneration
の値が metadata.generation
の値と異なる場合、リソースの最新の更新が Operator によって処理されていません。これらの値が同じである場合、リソースの最新の変更がステータス情報に反映されます。
AMQ Streams によってカスタムリソースのステータスが作成および維持されます。定期的にカスタムリソースの現在の状態が評価され、その結果に応じてステータスが更新されます。くださいーたとえば、oc edit
を使用してカスタムリソースで更新を行う場合、その status
は編集不可能です。さらに、status
の変更は Kafka クラスターステータスの設定に影響しません。
以下では、Kafka カスタムリソースに status
プロパティーが指定されています。
Kafka カスタムリソースとステータス
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: spec: # ... status: conditions: 1 - lastTransitionTime: 2019-07-23T23:46:57+0000 status: "True" type: Ready 2 observedGeneration: 4 3 listeners: 4 - addresses: - host: my-cluster-kafka-bootstrap.myproject.svc port: 9092 type: plain - addresses: - host: my-cluster-kafka-bootstrap.myproject.svc port: 9093 certificates: - | -----BEGIN CERTIFICATE----- ... -----END CERTIFICATE----- type: tls - addresses: - host: 172.29.49.180 port: 9094 certificates: - | -----BEGIN CERTIFICATE----- ... -----END CERTIFICATE----- type: external # ...
- 1
- status の
conditions
は、既存のリソース情報から推測できないステータスに関連する基準や、リソースのインスタンスに固有する基準を記述します。 - 2
Ready
条件は、Cluster Operator が現在 Kafka クラスターでトラフィックの処理が可能であると判断するかどうかを示しています。- 3
observedGeneration
は、最後に Cluster Operator によって照合されたKafka
カスタムリソースの生成を示しています。- 4
listeners
は、現在の Kafka ブートストラップアドレスをタイプ別に示しています。重要タイプが
nodeport
の外部リスナーのカスタムリソースステータスにおけるアドレスは、現在サポートされていません。
Kafka ブートストラップアドレスがステータスに一覧表示されても、それらのエンドポイントまたは Kafka クラスターが準備状態であるとは限りません。
ステータス情報のアクセス
リソースのステータス情報はコマンドラインから取得できます。詳細は 「カスタムリソースのステータスの確認」 を参照してください。
2.3. Cluster Operator
Cluster Operator は、OpenShift クラスター内で Apache Kafka クラスターのデプロイおよび管理を行います。
2.3.1. Cluster Operator
AMQ Streams では、Cluster Operator を使用して以下のクラスターをデプロイおよび管理します。
- Kafka (ZooKeeper、Entity Operator、および Kafka Exporter を含む)
- Kafka Connect
- Kafka MirrorMaker
- Kafka Bridge
クラスターのデプロイメントにはカスタムリソースが使用されます。
たとえば、以下のように Kafka クラスターをデプロイします。
-
クラスター設定のある
Kafka
リソースが OpenShift クラスター内で作成されます。 -
Kafka
リソースに宣言された内容を基にして、該当する Kafka クラスターが Cluster Operator によってデプロイされます。
Cluster Operator で以下もデプロイできます (Kafka
リソースの設定より)。
-
KafkaTopic
カスタムリソースより Operator スタイルのトピック管理を提供する Topic Operator -
KafkaUser
カスタムリソースより Operator スタイルのユーザー管理を提供する User Operator
デプロイメントの Entity Operator 内の Topic Operator および User Operator 関数。
Cluster Operator のアーキテクチャー例
2.3.2. Cluster Operator デプロイメントの監視オプション
Cluster Operator の稼働中に、Kafka リソースの更新に対する監視が開始されます。
Cluster Operator はデプロイメントに応じて、以下から Kafka リソースを監視できます。
AMQ Streams では、デプロイメントの処理を簡単にするため、サンプル YAML ファイルが提供されます。
Cluster Operator では、以下のリソースの変更が監視されます。
-
Kafka クラスターの
Kafka
。 -
KafkaConnect
の Kafka Connect クラスター。 -
Source2Image がサポートされる Kafka Connect クラスターの
KafkaConnectS2I
。 -
Kafka Connect クラスターでコネクターを作成および管理するための
KafkaConnector
。 -
Kafka MirrorMaker インスタンスの
KafkaMirrorMaker
。 -
Kafka Bridge インスタンスの
KafkaBridge
。
OpenShift クラスターでこれらのリソースの 1 つが作成されると、Operator によってクラスターの詳細がリソースより取得されます。さらに、StatefulSet、Service、および ConfigMap などの必要な OpenShift リソースが作成され、リソースの新しいクラスターの作成が開始されます。
Kafka リソースが更新されるたびに、リソースのクラスターを構成する OpenShift リソースで該当する更新が Operator によって実行されます。
クラスターの望ましい状態がリソースのクラスターに反映されるようにするため、リソースへのパッチ適用後またはリソースの削除後にリソースが再作成されます。この操作は、サービスの中断を引き起こすローリングアップデートの原因となる可能性があります。
リソースが削除されると、Operator によってクラスターがアンデプロイされ、関連する OpenShift リソースがすべて削除されます。
2.3.3. 単一の namespace を監視対象とする Cluster Operator のデプロイメント
前提条件
-
この手順では、
CustomResourceDefinitions
、ClusterRoles
、およびClusterRoleBindings
を作成できる OpenShift ユーザーアカウントを使用する必要があります。通常、OpenShift クラスターでロールベースアクセス制御 (RBAC) を使用する場合、これらのリソースを作成、編集、および削除する権限を持つユーザーはsystem:admin
などの OpenShift クラスター管理者に限定されます。 Cluster Operator がインストールされる namespace に従い、インストールファイルを編集します。
Linux の場合は、以下を使用します。
sed -i 's/namespace: .*/namespace: my-namespace/' install/cluster-operator/*RoleBinding*.yaml
MacOS の場合は、以下を使用します。
sed -i '' 's/namespace: .*/namespace: my-namespace/' install/cluster-operator/*RoleBinding*.yaml
手順
Cluster Operator をデプロイします。
oc apply -f install/cluster-operator -n my-namespace
2.3.4. 複数の namespace を監視対象とする Cluster Operator のデプロイメント
前提条件
-
この手順では、
CustomResourceDefinitions
、ClusterRoles
、およびClusterRoleBindings
を作成できる OpenShift ユーザーアカウントを使用する必要があります。通常、OpenShift クラスターでロールベースアクセス制御 (RBAC) を使用する場合、これらのリソースを作成、編集、および削除する権限を持つユーザーはsystem:admin
などの OpenShift クラスター管理者に限定されます。 Cluster Operator がインストールされる namespace にしたがって、インストールファイルを編集します。
Linux の場合は、以下を使用します。
sed -i 's/namespace: .*/namespace: my-namespace/' install/cluster-operator/*RoleBinding*.yaml
MacOS の場合は、以下を使用します。
sed -i '' 's/namespace: .*/namespace: my-namespace/' install/cluster-operator/*RoleBinding*.yaml
手順
install/cluster-operator/050-Deployment-strimzi-cluster-operator.yaml
ファイルを編集し、環境変数STRIMZI_NAMESPACE
で、Cluster Operator がリソースを監視するすべての namespace を一覧表示します。以下に例を示します。apiVersion: apps/v1 kind: Deployment spec: # ... template: spec: serviceAccountName: strimzi-cluster-operator containers: - name: strimzi-cluster-operator image: registry.redhat.io/amq7/amq-streams-rhel7-operator:1.4.0 imagePullPolicy: IfNotPresent env: - name: STRIMZI_NAMESPACE value: watched-namespace-1,watched-namespace-2,watched-namespace-3
Cluster Operator によって監視されるすべての namespace (上記の例では
watched-namespace-1
、watched-namespace-2
、およびwatched-namespace-3
) に対して、RoleBindings
をインストールします。watched-namespace
は、直前のステップで使用した namespace に置き換えます。oc apply
を使用してこれを行うことができます。oc apply -f install/cluster-operator/020-RoleBinding-strimzi-cluster-operator.yaml -n watched-namespace oc apply -f install/cluster-operator/031-RoleBinding-strimzi-cluster-operator-entity-operator-delegation.yaml -n watched-namespace oc apply -f install/cluster-operator/032-RoleBinding-strimzi-cluster-operator-topic-operator-delegation.yaml -n watched-namespace
Cluster Operator をデプロイします。
oc apply
を使用してこれを行うことができます。oc apply -f install/cluster-operator -n my-namespace
2.3.5. すべての namespace を対象とする Cluster Operator のデプロイメント
OpenShift クラスターのすべての namespace で AMQ Streams リソースを監視するように Cluster Operator を設定できます。このモードで実行している場合、Cluster Operator によって、新規作成された namespace でクラスターが自動的に管理されます。
前提条件
-
この手順では、
CustomResourceDefinitions
、ClusterRoles
、およびClusterRoleBindings
を作成できる OpenShift ユーザーアカウントを使用する必要があります。通常、OpenShift クラスターでロールベースアクセス制御 (RBAC) を使用する場合、これらのリソースを作成、編集、および削除する権限を持つユーザーはsystem:admin
などの OpenShift クラスター管理者に限定されます。 - OpenShift クラスターが稼働している必要があります。
手順
すべての namespace を監視するように Cluster Operator を設定します。
-
050-Deployment-strimzi-cluster-operator.yaml
ファイルを編集します。 STRIMZI_NAMESPACE
環境変数の値を*
に設定します。apiVersion: apps/v1 kind: Deployment spec: # ... template: spec: # ... serviceAccountName: strimzi-cluster-operator containers: - name: strimzi-cluster-operator image: registry.redhat.io/amq7/amq-streams-rhel7-operator:1.4.0 imagePullPolicy: IfNotPresent env: - name: STRIMZI_NAMESPACE value: "*" # ...
-
クラスター全体ですべての namespace にアクセスできる権限を Cluster Operator に付与する
ClusterRoleBindings
を作成します。oc create clusterrolebinding
コマンドを使用します。oc create clusterrolebinding strimzi-cluster-operator-namespaced --clusterrole=strimzi-cluster-operator-namespaced --serviceaccount my-namespace:strimzi-cluster-operator oc create clusterrolebinding strimzi-cluster-operator-entity-operator-delegation --clusterrole=strimzi-entity-operator --serviceaccount my-namespace:strimzi-cluster-operator oc create clusterrolebinding strimzi-cluster-operator-topic-operator-delegation --clusterrole=strimzi-topic-operator --serviceaccount my-namespace:strimzi-cluster-operator
my-namespace
は、Cluster Operator をインストールする namespace に置き換えます。Cluster Operator を OpenShift クラスターにデプロイします。
oc apply
コマンドを使用します。oc apply -f install/cluster-operator -n my-namespace
2.3.6. OperatorHub からの Cluster Operator のデプロイ
OperatorHub から AMQ Streams Operator をインストールして、Cluster Operator を OpenShift クラスターにデプロイできます。OperatorHub は OpenShift 4 のみで使用できます
前提条件
-
Red Hat Operator の
OperatorSource
が OpenShift クラスターで有効になっている必要があります。適切なOperatorSource
が有効になっていれば OperatorHub に Red Hat Operator が表示されます。詳細は、『Operator』を参照してください。 - インストールには、Operator を OperatorHub からインストールするための権限を持つユーザーが必要です
手順
- OpenShift 4 Web コンソールで、Operators > OperatorHub をクリックします。
Streaming & Messaging カテゴリーの AMQ Streams Operator を検索または閲覧します。
- AMQ Streams タイルをクリックし、右側のサイドバーで Install をクリックします。
Create Operator Subscription 画面で、以下のインストールおよび更新オプションから選択します。
- Installation Mode: AMQ Streams Operator をクラスターのすべての (プロジェクト) namespace にインストール (デフォルト) するか、特定の (プロジェクト) namespace インストールするかを選択します。namespace を使用して関数を分離することが推奨されます。Kafka クラスターおよび他の AMQ Streams コンポーネントが含まれる namespace とは別に、独自の namespace に Operator をインストールすることが推奨されます。
- Approval Strategy: デフォルトでは、OLM (Operator Lifecycle Manager) によって、AMQ Streams Operator が自動的に最新の AMQ Streams バージョンにアップグレードされます。今後のアップグレードを手動で承認する場合は、Manual を選択します。詳細は、OpenShift ドキュメントの『Operator』を参照してください。
Subscribe をクリックすると、AMQ Streams Operator が OpenShift クラスターにインストールされます。
AMQ Streams Operator によって、Cluster Operator、CRD、およびロールベースアクセス制御 (RBAC) リソースは選択された namespace またはすべての namespace にデプロイされます。
Installed Operators 画面で、インストールの進捗を確認します。AMQ Streams Operator は、ステータスが InstallSucceeded に変更されると使用できます。
次に、YAML サンプルファイルを使用して、Kafka クラスターから順に AMQ Streams の他のコンポーネントをデプロイできます。
2.4. Kafka クラスター
AMQ Streams を使用して、一時または永続 Kafka クラスターを OpenShift にデプロイできます。Kafka をインストールする場合、AMQ Streams によって ZooKeeper クラスターもインストールされ、Kafka と ZooKeeper との接続に必要な設定が追加されます。
AMQ Streams を使用して、Kafka Exporter をデプロイすることもできます。
- 一時クラスター
-
通常、Kafka の一時クラスターは開発およびテスト環境での使用に適していますが、本番環境での使用には適していません。このデプロイメントでは、ブローカー情報 (ZooKeeper) と、トピックまたはパーティション (Kafka) を格納するための
emptyDir
ボリュームが使用されます。emptyDir
ボリュームを使用すると、その内容は厳密に Pod のライフサイクルと関連し、Pod がダウンすると削除されます。 - 永続クラスター
-
Kafka の永続クラスターでは、
PersistentVolumes
を使用して ZooKeeper および Kafka データを格納します。PersistentVolumeClaim
を使用してPersistentVolume
が取得され、PersistentVolume
の実際のタイプには依存しません。たとえば、YAML ファイルを変更しなくても Amazon AWS デプロイメントで Amazon EBS ボリュームを使用できます。PersistentVolumeClaim
でStorageClass
を使用し、自動ボリュームプロビジョニングをトリガーすることができます。
AMQ Streams には、Kafka クラスターをデプロイするサンプルが複数含まれています。
-
kafka-persistent.yaml
は、3 つの Zookeeper ノードと 3 つの Kafka ノードを使用して永続クラスターをデプロイします。 -
kafka-jbod.yaml
は、それぞれが複数の永続ボリューを使用する、3 つの ZooKeeper ノードと 3 つの Kafka ノードを使用して、永続クラスターをデプロイします。 -
kafka-persistent-single.yaml
は、1 つの ZooKeeper ノードと 1 つの Kafka ノードを使用して、永続クラスターをデプロイします。 -
kafka-ephemeral.yaml
は、3 つの ZooKeeper ノードと 3 つの Kafka ノードを使用して、一時クラスターをデプロイします。 -
kafka-ephemeral-single.yaml
は、3 つの ZooKeeper ノードと 1 つの Kafka ノードを使用して、一時クラスターをデプロイします。
サンプルクラスターの名前はデフォルトで my-cluster
になります。クラスター名はリソースの名前によって定義され、クラスターがデプロイされた後に変更できません。クラスターをデプロイする前にクラスター名を変更するには、関連する YAML ファイルのリソースの Kafka.metadata.name
プロパティーを編集します。
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster # ...
2.4.1. Kafka クラスターのデプロイメント
コマンドラインで、Kafka の一時または永続クラスターを OpenShift にデプロイできます。
前提条件
- Cluster Operator がデプロイされている必要があります。
手順
クラスターを開発またはテストの目的で使用する予定である場合は、
oc apply
を使用して一時クラスターを作成およびデプロイできます。oc apply -f examples/kafka/kafka-ephemeral.yaml
クラスターを実稼働で使用する予定である場合は、
oc apply
を使用して永続クラスターを作成およびデプロイします。oc apply -f examples/kafka/kafka-persistent.yaml
その他のリソース
- Cluster Operator のデプロイメントに関する詳細は、「Cluster Operator」 を参照してください。
-
Kafka
リソースによってサポートされる異なる設定オプションの詳細は、「Kafka クラスターの設定」 を参照してください。
2.5. Kafka Connect
Kafka Connect は、Apache Kafka と外部システムとの間でデータをストリーミングするためのツールです。Kafka Connect では、スケーラビリティーと信頼性を維持しながら Kafka クラスターで大量のデータを出し入れするためのフレームワークが提供されます。Kafka Connect は通常、Kafka を外部データベース、ストレージシステム、およびメッセージングシステムと統合するために使用されます。
Kafka Connect では、ソースコネクター は外部システムからデータを取得し、それをメッセージとして Kafka に提供するランタイムエンティティーです。シンクコネクター は、Kafka トピックからメッセージを取得し、外部システムに提供するランタイムエンティティーです。コネクターのワークロードは タスク に分割されます。タスクは、Connect クラスター を構成するノード (ワーカー とも呼ばれる) の間で分散されます。これにより、メッセージのフローが非常にスケーラブルになり、信頼性が高くなります。
各コネクターは特定の コネクタークラス のインスタンスで、メッセージに関して関連する外部システムとの通信方法を認識しています。コネクターは多くの外部システムで使用でき、独自のコネクターを開発することもできます。
コネクター という用語は、Kafka Connect クラスター内で実行されているコネクターインスタンスや、コネクタークラスと同じ意味で使用されます。本ガイドでは、本文の内容で意味が明確である場合に コネクター という用語を使用します。
AMQ Stremas では以下を行うことが可能です。
- 必要なコネクターが含まれる Kafka Connect イメージの作成。
-
KafkaConnect
リソースを使用した OpenShift 内での Kafka Connect クラスターのデプロイおよび管理。 -
任意で
KafkaConnector
リソースを使用して管理された Kafka Connect クラスター内でのコネクターの実行。
Kafka Connect には、ファイルベースのデータを Kafka クラスターで出し入れするために以下の組み込みコネクターが含まれています。
ファイルコネクター | 説明 |
---|---|
| ファイル (ソース) から Kafka クラスターにデータを転送します。 |
| Kafka クラスターからファイル (シンク) にデータを転送します。 |
その他のコネクタークラスを使用するには、以下の手順の 1 つにしたがってコネクターイメージを準備する必要があります。
Cluster Operator では、Kafka Connect クラスターを OpenShift クラスターにデプロイするために作成するイメージを使用できます。
Kafka Connect クラスターは、設定可能な数量のワーカーで Deployment
として実装されます。
コネクターを作成および管理 するには、KafkaConnector
リソースを使用するか、8083 番ポートで <connect-cluster-name>-connect-api
サービスとして使用できる Kafka Connect REST API を手作業で使用します。REST API でサポートされる操作は、Apache Kafka のドキュメント を参照してください。
2.5.1. Kafka Connect のクラスターへのデプロイメント
Cluster Operator を使用して、Kafka Connect クラスターを OpenShift クラスターにデプロイできます。
前提条件
- Cluster Operator をデプロイ する必要があります。
手順
oc apply
コマンドを使用して、kafka-connect.yaml
ファイルに基づいてKafkaConnect
リソースを作成します。oc apply -f examples/kafka-connect/kafka-connect.yaml
2.5.2. コネクタープラグインでの Kafka Connect の拡張
Kafka Connect の AMQ Streams コンテナーイメージには、FileStreamSourceConnector
と FileStreamSinkConnector
の 2 つの組み込みファイルコネクターが含まれています。以下を行うと、独自のコネクターを追加できます。
- Kafka Connect ベースイメージからコンテナーイメージを作成します (たとえば、手作業による作成または CI (継続インテグレーション) を使用した作成)。
- OpenShift ビルドおよび S2I (Source-to-Image) を使用してコンテナーイメージを作成します (OpenShift の場合のみ)。
2.5.2.1. Kafka Connect ベースイメージからの Docker イメージの作成
Red Hat Container Catalog の Kafka コンテナーイメージを、追加のコネクタープラグインで独自のカスタムイメージを作成するためのベースイメージとして使用できます。
以下の手順では、カスタムイメージを作成し、/opt/kafka/plugins
ディレクトリーに追加する方法を説明します。AMQ Stream バージョンの Kafka Connect は起動時に、/opt/kafka/plugins
ディレクトリーに含まれるサードパーティーのコネクタープラグインをロードします。
前提条件
- Cluster Operator をデプロイ する必要があります。
手順
registry.redhat.io/amq7/amq-streams-kafka-24-rhel7:1.4.0
をベースイメージとして使用して、新しいDockerfile
を作成します。FROM registry.redhat.io/amq7/amq-streams-kafka-24-rhel7:1.4.0 USER root:root COPY ./my-plugins/ /opt/kafka/plugins/ USER 1001
- コンテナーイメージをビルドします。
- カスタムイメージをコンテナーレジストリーにプッシュします。
新しいコンテナーイメージを示します。
以下のいずれかを行います。
KafkaConnect
カスタムリソースのKafkaConnect.spec.image
プロパティーを編集します。設定された場合、このプロパティーによって Cluster Operator の
STRIMZI_KAFKA_CONNECT_IMAGES
変数がオーバーライドされます。apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnect metadata: name: my-connect-cluster spec: #... image: my-new-container-image
または、以下を実行します。
-
install/cluster-operator/050-Deployment-strimzi-cluster-operator.yaml
ファイルのSTRIMZI_KAFKA_CONNECT_IMAGES
変数を編集して新しいコンテナーイメージを示すようにした後、Cluster Operator を再インストールします。
その他のリソース
-
KafkaConnect.spec.image property
の詳細は 「コンテナーイメージ」 を参照してください。 -
STRIMZI_KAFKA_CONNECT_IMAGES
変数の詳細は 「Cluster Operator の設定」 を参照してください。
2.5.2.2. OpenShift ビルドおよび S2I (Source-to-Image) を使用したコンテナーイメージの作成
OpenShift ビルド と S2I (Source-to-Image) フレームワークを使用して、新しいコンテナーイメージを作成できます。OpenShift ビルドは、S2I がサポートされるビルダーイメージとともに、ユーザー提供のソースコードおよびバイナリーを取得し、これらを使用して新しいコンテナーイメージを構築します。構築後、コンテナーイメージは OpenShfit のローカルコンテナーイメージリポジトリーに格納され、デプロイメントで使用可能になります。
S2I がサポートされる Kafka Connect ビルダーイメージは、registry.redhat.io/amq7/amq-streams-kafka-24-rhel7:1.4.0
イメージの一部として、Red Hat Container Catalog で提供されます。このS2I イメージは、バイナリー (プラグインおよびコネクターとともに) を取得し、/tmp/kafka-plugins/s2i
ディレクトリーに格納されます。このディレクトリーから、Kafka Connect デプロイメントとともに使用できる新しい Kafka Connect イメージを作成します。改良されたイメージの使用を開始すると、Kafka Connect は /tmp/kafka-plugins/s2i
ディレクトリーからサードパーティープラグインをロードします。
手順
コマンドラインで
oc apply
コマンドを使用し、Kafka Connect の S2I クラスターを作成およびデプロイします。oc apply -f examples/kafka-connect/kafka-connect-s2i.yaml
Kafka Connect プラグインでディレクトリーを作成します。
$ tree ./my-plugins/ ./my-plugins/ ├── debezium-connector-mongodb │ ├── bson-3.4.2.jar │ ├── CHANGELOG.md │ ├── CONTRIBUTE.md │ ├── COPYRIGHT.txt │ ├── debezium-connector-mongodb-0.7.1.jar │ ├── debezium-core-0.7.1.jar │ ├── LICENSE.txt │ ├── mongodb-driver-3.4.2.jar │ ├── mongodb-driver-core-3.4.2.jar │ └── README.md ├── debezium-connector-mysql │ ├── CHANGELOG.md │ ├── CONTRIBUTE.md │ ├── COPYRIGHT.txt │ ├── debezium-connector-mysql-0.7.1.jar │ ├── debezium-core-0.7.1.jar │ ├── LICENSE.txt │ ├── mysql-binlog-connector-java-0.13.0.jar │ ├── mysql-connector-java-5.1.40.jar │ ├── README.md │ └── wkb-1.0.2.jar └── debezium-connector-postgres ├── CHANGELOG.md ├── CONTRIBUTE.md ├── COPYRIGHT.txt ├── debezium-connector-postgres-0.7.1.jar ├── debezium-core-0.7.1.jar ├── LICENSE.txt ├── postgresql-42.0.0.jar ├── protobuf-java-2.6.1.jar └── README.md
oc start-build
コマンドで、準備したディレクトリーを使用してイメージの新しいビルドを開始します。oc start-build my-connect-cluster-connect --from-dir ./my-plugins/
注記ビルドの名前は、デプロイされた Kafka Connect クラスターと同じになります。
- ビルドが完了したら、Kafka Connect のデプロイメントによって新しいイメージが自動的に使用されます。
2.5.3. コネクターの作成および管理
コネクタープラグインのコンテナーイメージを作成したら、Kafka Connect クラスターにコネクターインスタンスを作成する必要があります。その後、稼働中のコネクターインスタンスを設定、監視、および管理できます。
AMQ Streams では、コネクターの作成および管理に 2 つの API が提供されます。
-
KafkaConnector
リソース (KafkaConnectors
と呼ばれます) - Kafka Connect REST API
API を使用すると、以下を行うことができます。
- コネクターインスタンスのステータスの確認。
- 稼働中のコネクターの再設定。
- コネクターインスタンスのタスク数の増減。
-
失敗したタスクの再起動 (
KafkaConnector
リソースによってサポートされません)。 - コネクターインスタンスの一時停止。
- 一時停止したコネクターインスタンスの再開。
- コネクターインスタンスの削除。
2.5.3.1. KafkaConnector
リソース
KafkaConnectors
を使用すると、Kafka Connect のコネクターインスタンスを OpenShift ネイティブに作成および管理できるため、cURL などの HTTP クライアントが必要ありません。その他の Kafka リソースと同様に、コネクターの望ましい状態を OpenShift クラスターにデプロイされた KafkaConnector
YAML ファイルに宣言し、コネクターインスタンスを作成します。
該当する KafkaConnector
を更新して稼働中のコネクターインスタンスを管理した後、更新を適用します。該当する KafkaConnector
を削除して、コネクターを削除します。
これまでのバージョンの AMQ Streams との互換性を維持するため、KafkaConnectors
はデフォルトで無効になっています。Kafka Connect クラスターのために有効にするには、KafkaConnect
リソースでアノテーションを使用する必要があります。手順は、「KafkaConnector
リソースの有効化」 を参照してください。
KafkaConnectors
が有効になると、Cluster Operator によって監視が開始されます。KafkaConnectors
に定義された設定と一致するよう、稼働中のコネクターインスタンスの設定を更新します。
AMQ Streams には、examples/connector/source-connector.yaml
という名前のサンプル KafkaConnector
が含まれています。このサンプルを使用して、FileStreamSourceConnector
を作成および管理できます。
2.5.3.2. Kafka Connect REST API の可用性
Kafka Connect REST API は、<connect-cluster-name>-connect-api
サービスとして 8083 番ポートで使用できます。
KafkaConnectors
が有効になっている場合、Kafka Connect REST API に直接手作業で追加された変更は Cluster Operator によって元に戻されます。
2.5.4. KafkaConnector
リソースの Kafka Connect へのデプロイ
サンプル KafkaConnector
を Kafka Connect クラスターにデプロイします。YAML の例によって FileStreamSourceConnector
が作成され、ライセンスファイルの各行が my-topic
という名前のトピックでメッセージとして Kafka に送信されます。
前提条件
-
KafkaConnectors
が有効になっている Kafka Connect デプロイメントが必要です。 - 稼働中の Cluster Operator が必要です。
手順
examples/connector/source-connector.yaml
ファイルを編集します。apiVersion: kafka.strimzi.io/v1alpha1 kind: KafkaConnector metadata: name: my-source-connector 1 labels: strimzi.io/cluster: my-connect-cluster 2 spec: class: org.apache.kafka.connect.file.FileStreamSourceConnector 3 tasksMax: 2 4 config: 5 file: "/opt/kafka/LICENSE" topic: my-topic # ...
OpenShift クラスターに
KafkaConnector
を作成します。oc apply -f examples/connector/source-connector.yaml
リソースが作成されたことを確認します。
oc get kctr --selector strimzi.io/cluster=my-connect-cluster -o name
2.6. Kafka MirrorMaker
Cluster Operator によって、1 つ以上の Kafka MirrorMaker のレプリカがデプロイされ、Kafka クラスターの間でデータが複製されます。このプロセスはミラーリングと言われ、Kafka パーティションのレプリケーションの概念と混同しないようにします。MirrorMaker は、ソースクラスターからメッセージを消費し、これらのメッセージをターゲットクラスターにパブリッシュします。
リソースの例や Kafka MirrorMaker のデプロイ形式に関する詳細は、「Kafka MirrorMaker の設定」を参照してください。
2.6.1. Kafka MirrorMaker のデプロイ
前提条件
- Kafka MirrorMaker をデプロイする前に、Cluster Operator をデプロイする必要があります。
手順
コマンドラインから Kafka MirrorMaker クラスターを作成します。
oc apply -f examples/kafka-mirror-maker/kafka-mirror-maker.yaml
その他のリソース
- Cluster Operator のデプロイメントに関する詳細は、「Cluster Operator」 を参照してください。
2.7. Kafka Bridge
Cluster Operator によって、1 つ以上の Kafka Bridge のレプリカがデプロイされ、HTTP API 経由で Kafka クラスターとクライアントの間でデータが送信されます。
リソースの例や Kafka Bridge のデプロイ形式に関する詳細は、「Kafka Bridge の設定」を参照してください。
2.7.1. Kafka Bridge を OpenShift クラスターへデプロイ
Cluster Operator を使用して、Kafka Bridge クラスターを OpenShift クラスターにデプロイできます。
前提条件
- Cluster Operator を OpenShift にデプロイする必要があります。
手順
oc apply
コマンドを使用して、kafka-bridge.yaml
ファイルに基づいてKafkaBridge
リソースを作成します。oc apply -f examples/kafka-bridge/kafka-bridge.yaml
その他のリソース
2.8. サンプルクライアントのデプロイ
前提条件
- クライアントが接続する既存の Kafka クラスターが必要です。
手順
プロデューサーをデプロイします。
oc run
を使用します。oc run kafka-producer -ti --image=registry.redhat.io/amq7/amq-streams-kafka-24-rhel7:1.4.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list cluster-name-kafka-bootstrap:9092 --topic my-topic
- プロデューサーが実行しているコンソールにメッセージを入力します。
- Enter を押してメッセージを送信します。
コンシューマーをデプロイします。
oc run
を使用します。oc run kafka-consumer -ti --image=registry.redhat.io/amq7/amq-streams-kafka-24-rhel7:1.4.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server cluster-name-kafka-bootstrap:9092 --topic my-topic --from-beginning
- コンシューマーコンソールに受信メッセージが表示されることを確認します。
2.9. Topic Operator
Topic Operator は、OpenShift クラスター内で稼働している Kafka クラスター内の Kafka トピックを管理します。
2.9.1. Topic Operator
Topic Operator は、OpenShift リソースより Kafka クラスターのトピックを管理する方法を提供します。
Topic Operator のアーキテクチャー例
Topic Operator の役割は、対応する Kafka トピックと同期して Kafka トピックを記述する KafkaTopic
OpenShift リソースのセットを保持することです。
KafkaTopic
とトピックの関係は次のとおりです。
-
KafkaTopic
が作成されると、Topic Operator によってトピックが作成されます。 -
KafkaTopic
が削除されると、Topic Operator によってトピックが削除されます。 -
KafkaTopic
が変更されると、Topick Operator によってトピックが更新されます。
上記と逆になるトピックと KafkaTopic
の関係は次のとおりです。
-
トピックが Kafka クラスター内で作成されると、Operator によって
KafkaTopic
が作成されます。 -
トピックが Kafka クラスターから削除されると、Operator によって
KafkaTopic
が削除されます。 -
トピックが Kafka クラスターで変更されると、Operator によって
KafkaTopic
が更新されます。
このため、KafkaTopic
をアプリケーションのデプロイメントの一部として宣言でき、トピックの作成は Topic Operator によって行われます。アプリケーションは、必要なトピックからの作成または消費のみに対処する必要があります。
トピックが再設定された場合や、別の Kafka ノードに再割り当てされた場合、KafkaTopic
は常に最新の状態になります。
2.9.2. Cluster Operator を使用した Topic Operator のデプロイ
この手順では、Cluster Operator を使用して Topic Operator をデプロイする方法を説明します。AMQ Streams によって管理されない Kafka クラスターを Topic Operator と使用する場合は、Topic Operator をスタンドアロンコンポーネントとしてデプロイする必要があります。詳細は「スタンドアロン Topic Operator のデプロイ」を参照してください。
前提条件
- 稼働中の Cluster Operator が必要です。
-
作成または更新する
Kafka
リソースが必要です。
手順
Kafka.spec.entityOperator
オブジェクトがKafka
リソースに存在することを確認します。このオブジェクトによって Entity Operator が設定されます。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: #... entityOperator: topicOperator: {} userOperator: {}
-
「
EntityTopicOperatorSpec
スキーマ参照」 で説明されたプロパティーを使用して、Topic Operator を設定します。 OpenShift で Kafka リソースを作成または更新します。
oc apply
を使用します。oc apply -f your-file
その他のリソース
- Cluster Operator のデプロイメントに関する詳細は、「Cluster Operator」 を参照してください。
- Entity Operator のデプロイメントに関する詳細は、「Entitiy Operator」 を参照してください。
-
Cluster Operator によってデプロイされた場合に Topic Operator の設定に使用される
Kafka.spec.entityOperator
オブジェクトに関する詳細は、「EntityOperatorSpec
スキーマ参照」 を参照してください。
2.10. User Operator
User Operator は、OpenShift クラスター内で稼働している Kafka クラスター内の Kafka ユーザーを管理します。
2.10.1. User Operator
User Operator は、Kafka ユーザーが記述される KafkaUser
リソースを監視して Kafka クラスターの Kafka ユーザーを管理し、Kafka ユーザーが Kafka クラスターで適切に設定されるようにします。
たとえば、KafkaUser
とユーザーの関係は次のようになります。
-
KafkaUser
が作成されると、User Operator によって記述されるユーザーが作成されます。 -
KafkaUser
が削除されると、User Operator によって記述されるユーザーが削除されます。 -
KafkaUser
が変更されると、User Operator によって記述されるユーザーが更新されます。
User Operator は Topic Operator とは異なり、Kafka クラスターからの変更は OpenShift リソースと同期されません。アプリケーションで直接 Kafka トピックを Kafka で作成することは可能ですが、ユーザーが User Operator と同時に直接 Kafka クラスターで管理されることは想定されません。
User Operator では、アプリケーションのデプロイメントの一部として KafkaUser
リソースを宣言できます。ユーザーの認証および承認メカニズムを指定できます。たとえば、ユーザーがブローカーへのアクセスを独占しないようにするため、Kafka リソースの使用を制御する ユーザークォータ を設定することもできます。
ユーザーが作成されると、ユーザークレデンシャルが Secret
に作成されます。アプリケーションはユーザーとそのクレデンシャルを使用して、認証やメッセージの生成または消費を行う必要があります。
User Operator は 認証のクレデンシャルを管理する他に、KafkaUser
宣言にユーザーのアクセス権限の記述を含めることで承認も管理します。
2.10.2. Cluster Operator を使用した User Operator のデプロイ
前提条件
- 稼働中の Cluster Operator が必要です。
-
作成または更新する
Kafka
リソースが必要です。
手順
-
Kafka
リソースを編集し、希望どおりに User Operator を設定するKafka.spec.entityOperator.userOperator
オブジェクトが含まれるようにします。 OpenShift で Kafka リソースを作成または更新します。
oc apply
を使用してこれを行うことができます。oc apply -f your-file
その他のリソース
- Cluster Operator のデプロイメントに関する詳細は、「Cluster Operator」 を参照してください。
-
Cluster Operator によってデプロイされた場合に Topic Operator の設定に使用される
Kafka.spec.entityOperator
オブジェクトに関する詳細は「EntityOperatorSpec
スキーマ参照」を参照してください。
2.11. Strimzi 管理者
AMQ Streams には複数のカスタムリソースが含まれています。デフォルトでは、これらのリソースを作成、編集、および削除する権限は OpenShift クラスター管理者に制限されます。クラスター管理者以外に AMQ Streams リソースを管理する権限を与える場合は、Strimzi 管理者ロールを割り当てる必要があります。
2.11.1. Strimzi 管理者の指名
前提条件
-
AMQ Streams の
CustomResourceDefinitions
がインストールされている必要があります。
手順
OpenShift で
strimzi-admin
クラスターロールを作成します。oc apply
を使用します。oc apply -f install/strimzi-admin
strimzi-admin
ClusterRole
を OpenShift クラスターの 1 人以上の既存ユーザーに割り当てます。oc create
を使用します。oc create clusterrolebinding strimzi-admin --clusterrole=strimzi-admin --user=user1 --user=user2
2.12. コンテナーイメージ
AMQ Streams のコンテナーイメージは Red Hat Container Catalog にあります。AMQ Streams によって提供されるインストール YAML ファイルは、直接 Red Hat Container Catalog からイメージをプルします。
Red Hat Container Catalog にアクセスできない場合や独自のコンテナーリポジトリーを使用する場合は以下を行います。
- リストにある すべての コンテナーイメージをプルします。
- 独自のレジストリーにプッシュします。
- インストール YAML ファイルのイメージ名を更新します。
リリースに対してサポートされる各 Kafka バージョンには別のイメージがあります。
コンテナーイメージ | namespace/リポジトリー | 説明 |
---|---|---|
Kafka |
| 次を含む、Kafka を実行するための AMQ Streams イメージ。
|
Operator |
| Operator を実行するための AMQ Streams イメージ。
|
Kafka Bridge |
| AMQ Streams Kafka Bridge を稼働するための AMQ Streams イメージ |
第3章 デプロイメント設定
本章では、サポートされるデプロイメントの異なる側面を設定する方法について説明します。
- Kafka クラスター
- Kafka Connect クラスター
- Source2Image がサポートされる Kafka Connect クラスター
- Kafka Mirror Maker
- Kafka Bridge
- OAuth 2.0 のトークンベースの認証
- OAuth 2.0 のトークンベースの承認
3.1. Kafka クラスターの設定
Kafka
リソースの完全なスキーマは 「Kafka
スキーマ参照」 に記載されています。指定の Kafka
リソースに適用されたすべてのラベルは、Kafka クラスターを構成する OpenShift リソースにも適用されます。そのため、必要に応じてリソースにラベルが適用されるため便利です。
3.1.1. Kafka YAML の設定例
Kafka デプロイメントで利用可能な設定オプションを理解するには、ここに提供されるサンプル YAML ファイルを参照してください。
例では、可能な設定オプションの一部のみを取り上げますが、特に重要なオプションは次のとおりです。
- リソース要求 (CPU/メモリー)
- 最大および最小メモリー割り当ての JVM オプション
- リスナー (および認証)
- 認証
- ストレージ
- ラックアウェアネス (Rack Awareness)
- メトリクス
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: replicas: 3 1 version: 1.4 2 resources: 3 requests: memory: 64Gi cpu: "8" limits: 4 memory: 64Gi cpu: "12" jvmOptions: 5 -Xms: 8192m -Xmx: 8192m listeners: 6 tls: authentication:7 type: tls external: 8 type: route authentication: type: tls configuration: brokerCertChainAndKey: 9 secretName: my-secret certificate: my-certificate.crt key: my-key.key authorization: 10 type: simple config: 11 auto.create.topics.enable: "false" offsets.topic.replication.factor: 3 transaction.state.log.replication.factor: 3 transaction.state.log.min.isr: 2 storage: 12 type: persistent-claim 13 size: 10000Gi 14 rack: 15 topologyKey: failure-domain.beta.kubernetes.io/zone metrics: 16 lowercaseOutputName: true rules: 17 # Special cases and very specific rules - pattern : kafka.server<type=(.+), name=(.+), clientId=(.+), topic=(.+), partition=(.*)><>Value name: kafka_server_$1_$2 type: GAUGE labels: clientId: "$3" topic: "$4" partition: "$5" # ... zookeeper: 18 replicas: 3 resources: requests: memory: 8Gi cpu: "2" limits: memory: 8Gi cpu: "2" jvmOptions: -Xms: 4096m -Xmx: 4096m storage: type: persistent-claim size: 1000Gi metrics: # ... entityOperator: 19 topicOperator: resources: requests: memory: 512Mi cpu: "1" limits: memory: 512Mi cpu: "1" userOperator: resources: requests: memory: 512Mi cpu: "1" limits: memory: 512Mi cpu: "1" kafkaExporter: 20 # ...
- 1
- レプリカは、ブローカーノードの数を指定します。
- 2
- アップグレード手順にしたがうと変更できる Kafka バージョン。
- 3
- リソース要求は、指定のコンテナーに対して予約するリソースを指定します。
- 4
- リソースの制限は、コンテナーによって消費可能な最大リソースを指定します。
- 5
- JVM オプションは、JVM の最小 (
-Xms
) および最大 (-Xmx
) メモリー割り当てを指定 できます。 - 6
- リスナーは、ブートストラップアドレスでクライアントが Kafka クラスターに接続する方法を設定します。リスナーは、
plain
(暗号化なし)、tls
、またはexternal
として設定 されます。 - 7
- リスナーの認証メカニズムは各リスナーに対して設定でき、相互 TLS または SCRAM-SHA として指定 できます。
- 8
- 外部リスナー設定は、
route
、loadbalancer
、またはnodeport
からなど、Kafka クラスターが外部の OpenShift に公開される方法 を指定します。 - 9
- 外部の認証局によって管理される Kafka リスナー証明書 の任意設定。
brokerCertChainAndKey
プロパティーは、サーバー証明書および秘密鍵を保持するSecret
を指定します。Kafka リスナー証明書も TLS リスナーに対して設定できます。 - 10
- 11
- 設定によって、ブローカー設定が指定されます。標準の Apache Kafka 設定が提供されることがあり、AMQ Streams によって直接管理されないプロパティーに限定されます。
- 12
- ストレージは、
ephemeral
、persistent-claim
、またはjbod
として設定されます。 - 13
- 14
- 15
- ラックアウェアネスは、異なるラック全体でレプリカを分散 ために設定されます。
topology
キーはクラスターノードのラベルと一致する必要があります。 - 16
- 17
- JMX Exporter でメトリクスを Grafana ダッシュボードにエクスポートする Kafka ルール。AMQ Streams によって提供されるルールのセットは Kafka リソース設定にコピーされることがあります。
- 18
- Kafka 設定と似たプロパティーが含まれる、ZooKeeper 固有の設定。
- 19
- Topic Operator および User Operator の設定を指定する、Entity Operator 設定。
- 20
- データを Prometheus メトリクスとして公開するために使用される Kafka Exporter 設定。
3.1.2. データストレージに関する留意事項
効率的なデータストレージインフラストラクチャーは、AMQ Streams のパフォーマンスを最適化するために不可欠です。
ブロックストレージが必要です。NFS などのファイルストレージは、Kafka では機能しません。
ブロックストレージには、以下などを選択できます。
- Amazon Elastic Block Store (EBS)などのクラウドベースのブロックストレージソリューション。
- ローカルの永続ボリューム。
- ファイバーチャネル や iSCSI などのプロトコルがアクセスする SAN (ストレージネットワークエリア) ボリューム。
Strimzi には OpenShift の raw ブロックボリュームは必要ありません。
3.1.2.1. ファイルシステム
XFS ファイルシステムを使用するようにストレージシステムを設定することが推奨されます。AMQ Streams は ext4 ファイルシステムとも互換性がありますが、最適化するには追加の設定が必要になることがあります。
3.1.2.2. Apache Kafka および ZooKeeper ストレージ
Apache Kafka と ZooKeeper には別々のディスクを使用します。
3 つのタイプのデータストレージがサポートされます。
- 一時データストレージ (開発用のみで推奨されます)
- 永続データストレージ
- JBOD (Just a Bunch of Disks、Kafka のみに適しています)
詳細は「Kafka および ZooKeeper ストレージ」を参照してください。
ソリッドステートドライブ (SSD) は必須ではありませんが、複数のトピックに対してデータが非同期的に送受信される大規模なクラスターで Kafka のパフォーマンスを向上させることができます。SSD は、高速で低レイテンシーのデータアクセスが必要な ZooKeeper で特に有効です。
Kafka と ZooKeeper の両方にデータレプリケーションが組み込まれているため、複製されたストレージのプロビジョニングは必要ありません。
3.1.3. Kafka および ZooKeeper のストレージタイプ
Kafka および ZooKeeper はステートフルなアプリケーションであるため、データをディスクに格納する必要があります。AMQ Streams では、3 つのタイプのストレージがサポートされます。
- 一時ストレージ
- 永続ストレージ
- JBOD ストレージ
JBOD ストレージは Kafka でサポートされ、ZooKeeper ではサポートされていません。
Kafka
リソースを設定する場合、Kafka ブローカーおよび対応する ZooKeeper ノードによって使用されるストレージのタイプを指定できます。以下のリソースの storage
プロパティーを使用して、ストレージタイプを設定します。
-
Kafka.spec.kafka
-
Kafka.spec.zookeeper
ストレージタイプは type
フィールドで設定されます。
Kafka クラスターをデプロイした後に、ストレージタイプを変更することはできません。
その他のリソース
- 一時ストレージの詳細は、「一時ストレージのスキーマ参照」を参照してください。
- 永続ストレージの詳細は、「永続ストレージのスキーマ参照」を参照してください。
- JBOD ストレージの詳細は、「JBOD スキーマ参照」を参照してください。
-
Kafka
のスキーマに関する詳細は、「Kafka
スキーマ参照」を参照してください。
3.1.3.1. 一時ストレージ
一時ストレージは `emptyDir` volumes
ボリュームを使用してデータを保存します。一時ストレージを使用するには、type
フィールドを ephemeral
に設定する必要があります。
emptyDir
ボリュームは永続的ではなく、保存されたデータは Pod の再起動時に失われます。新規 Pod の起動後に、クラスターの他のノードからすべてのデータを復元する必要があります。一時ストレージは、単一ノードの ZooKeeper クラスターやレプリケーション係数が 1 の Kafka トピックでの使用には適していません。これはデータが損失する原因となるからです。
一時ストレージの例
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... storage: type: ephemeral # ... zookeeper: # ... storage: type: ephemeral # ...
3.1.3.1.1. ログディレクトリー
一時ボリュームは、以下のパスにマウントされるログディレクトリーとして Kafka ブローカーによって使用されます。
/var/lib/kafka/data/kafka-log_idx_
-
idx
は、Kafka ブローカー Pod インデックスです。例:/var/lib/kafka/data/kafka-log0
3.1.3.2. 永続ストレージ
永続ストレージは Persistent Volume Claim (永続ボリューム要求、PVC) を使用して、データを保存するための永続ボリュームをプロビジョニングします。永続ボリューム要求を使用すると、ボリュームのプロビジョニングを行う ストレージクラス に応じて、さまざまなタイプのボリュームをプロビジョニングできます。永続ボリューム要求と使用できるデータタイプには、多くのタイプの SAN ストレージやローカル永続ボリューム などがあります。
永続ストレージを使用するには、type
を persistent-claim
に設定する必要があります。永続ストレージでは、追加の設定オプションがサポートされます。
id
(任意)-
ストレージ ID 番号。このオプションは、JBOD ストレージ宣言で定義されるストレージボリュームには必須です。デフォルトは
0
です。 size
(必須)- 永続ボリューム要求のサイズを定義します (例: 1000Gi)。
class
(任意)- 動的ボリュームプロビジョニングに使用する OpenShift の ストレージクラス。
selector
(任意)- 使用する特定の永続ボリュームを選択できます。このようなボリュームを選択するラベルを表す key:value ペアが含まれます。
deleteClaim
(任意)-
クラスターのアンデプロイ時に永続ボリューム要求を削除する必要があるかどうかを指定するブール値。デフォルトは
false
です。
既存の AMQ Streams クラスターで永続ボリュームのサイズを増やすことは、永続ボリュームのサイズ変更をサポートする OpenShift バージョンでのみサポートされます。サイズを変更する永続ボリュームには、ボリューム拡張をサポートするストレージクラスを使用する必要があります。ボリューム拡張をサポートしないその他のバージョンの OpenShift およびストレージクラスでは、クラスターをデプロイする前に必要なストレージサイズを決定する必要があります。既存の永続ボリュームのサイズを縮小することはできません。
size
が 1000Gi の永続ストレージ設定の例 (抜粋)
# ... storage: type: persistent-claim size: 1000Gi # ...
以下の例は、ストレージクラスの使用例を示しています。
特定のストレージクラスを指定する永続ストレージ設定の例 (抜粋)
# ... storage: type: persistent-claim size: 1Gi class: my-storage-class # ...
最後に、selector
を使用して特定のラベルが付いた永続ボリュームを選択し、SSD などの必要な機能を提供できます。
セレクターを指定する永続ストレージ設定の例 (抜粋)
# ... storage: type: persistent-claim size: 1Gi selector: hdd-type: ssd deleteClaim: true # ...
3.1.3.2.1. ストレージクラスのオーバーライド
デフォルトのストレージクラスを使用する代わりに、1 つ以上の Kafka ブローカーに異なるストレージクラスを指定できます。これは、ストレージクラスが、異なるアベイラビリティーゾーンやデータセンターに制限されている場合などに便利です。この場合、overrides
フィールドを使用できます。
以下の例では、デフォルトのストレージクラスの名前は my-storage-class
になります。
ストレージクラスのオーバーライドを使用した AMQ Streams クラスターの例
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: labels: app: my-cluster name: my-cluster namespace: myproject spec: # ... kafka: replicas: 3 storage: deleteClaim: true size: 100Gi type: persistent-claim class: my-storage-class overrides: - broker: 0 class: my-storage-class-zone-1a - broker: 1 class: my-storage-class-zone-1b - broker: 2 class: my-storage-class-zone-1c # ...
overrides
プロパティーが設定され、ブローカーボリュームによって以下のストレージクラスが使用されます。
-
broker 0 の永続ボリュームでは
my-storage-class-zone-1a
が使用されます。 -
broker 1 の永続ボリュームでは
my-storage-class-zone-1b
が使用されます。 -
broker 2 の永続ボリュームでは
my-storage-class-zone-1c
が使用されます。
現在、overrides
プロパティーは、ストレージクラスの設定をオーバーライドするためのみに使用されます。他のストレージ設定フィールドのオーバーライドは現在サポートされていません。ストレージ設定の他のフィールドは現在サポートされていません。
3.1.3.2.2. Persistent Volume Claim (永続ボリューム要求、PVC) の命名
永続ストレージが使用されると、以下の名前で Persistent Volume Claim (永続ボリューム要求、PVC) が作成されます。
data-cluster-name-kafka-idx
-
Kafka ブローカー Pod
idx
のデータを保存するために使用されるボリュームの永続ボリューム要求です。 data-cluster-name-zookeeper-idx
-
ZooKeeper ノード Pod
idx
のデータを保存するために使用されるボリュームの永続ボリューム要求です。
3.1.3.2.3. ログディレクトリー
永続ボリュームは、以下のパスにマウントされるログディレクトリーとして Kafka ブローカーによって使用されます。
/var/lib/kafka/data/kafka-log_idx_
-
idx
は、Kafka ブローカー Pod インデックスです。例:/var/lib/kafka/data/kafka-log0
3.1.3.3. 永続ボリュームのサイズ変更
既存の AMQ Streams クラスターによって使用される永続ボリュームのサイズを増やすことで、ストレージ容量を増やすことができます。永続ボリュームのサイズ変更は、JBOD ストレージ設定で 1 つまたは複数の永続ボリュームが使用されるクラスターでサポートされます。
永続ボリュームのサイズを拡張することはできますが、縮小することはできません。永続ボリュームのサイズ縮小は、現在 OpenShift ではサポートされていません。
前提条件
- ボリュームのサイズ変更をサポートする OpenShift クラスター。
- Cluster Operator が稼働している必要があります。
- ボリューム拡張をサポートするストレージクラスを使用して作成された永続ボリュームを使用する Kafka クラスター。
手順
Kafka
リソースで、Kafka クラスター、ZooKeeper クラスター、またはその両方に割り当てられた永続ボリュームのサイズを増やします。-
Kafka クラスターに割り当てられたボリュームサイズを増やすには、
spec.kafka.storage
プロパティーを編集します。 ZooKeeper クラスターに割り当てたボリュームサイズを増やすには、
spec.zookeeper.storage
プロパティーを編集します。たとえば、ボリュームサイズを
1000Gi
から2000Gi
に増やすには、以下のように編集します。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... storage: type: persistent-claim size: 2000Gi class: my-storage-class # ... zookeeper: # ...
-
Kafka クラスターに割り当てられたボリュームサイズを増やすには、
リソースを作成または更新します。
oc apply
を使用します。oc apply -f your-file
OpenShift では、Cluster Operator からの要求に応じて、選択された永続ボリュームの容量が増やされます。サイズ変更が完了すると、サイズ変更された永続ボリュームを使用するすべての Pod が Cluster Operator によって再起動されます。これは自動的に行われます。
その他のリソース
OpenShift での永続ボリュームのサイズ変更に関する詳細は、「Resizing Persistent Volumes using Kubernetes」を参照してください。
3.1.3.4. JBOD ストレージの概要
AMQ Streams で、複数のディスクやボリュームのデータストレージ設定である JBOD を使用するように設定できます。JBOD は、Kafka ブローカーのデータストレージを増やす方法の 1 つです。また、パフォーマンスを向上することもできます。
JBOD 設定は 1 つまたは複数のボリュームによって記述され、各ボリュームは 一時 または 永続 ボリュームのいずれかになります。JBOD ボリューム宣言のルールおよび制約は、一時および永続ストレージのルールおよび制約と同じです。たとえば、永続ストレージのボリュームをプロビジョニング後に変更することはできません。
3.1.3.4.1. JBOD の設定
AMQ Streams で JBOD を使用するには、ストレージ type
を jbod
に設定する必要があります。volumes
プロパティーを使用すると、JBOD ストレージアレイまたは設定を構成するディスクを記述できます。以下は、JBOD 設定例の抜粋になります。
# ... storage: type: jbod volumes: - id: 0 type: persistent-claim size: 100Gi deleteClaim: false - id: 1 type: persistent-claim size: 100Gi deleteClaim: false # ...
id は、JBOD ボリュームの作成後に変更することはできません。
ユーザーは JBOD 設定に対してボリュームを追加または削除できます。
3.1.3.4.2. JBOD および 永続ボリューム要求 (PVC)
永続ストレージを使用して JBOD ボリュームを宣言する場合、永続ボリューム要求 (Persistent Volume Claim、PVC) の命名スキームは以下のようになります。
data-id-cluster-name-kafka-idx
-
id
は、Kafka ブローカー Podidx
のデータを保存するために使用されるボリュームの ID に置き換えます。
3.1.3.4.3. ログディレクトリー
JBOD ボリュームは、以下のパスにマウントされるログディレクトリーとして Kafka ブローカーによって使用されます。
/var/lib/kafka/data-id/kafka-log_idx_
-
id
は、Kafka ブローカー Podidx
のデータを保存するために使用されるボリュームの ID に置き換えます。例:/var/lib/kafka/data-0/kafka-log0
3.1.3.5. JBOD ストレージへのボリュームの追加
この手順では、JBOD ストレージを使用するように設定されている Kafka クラスターにボリュームを追加する方法を説明します。この手順は、他のストレージタイプを使用するように設定されている Kafka クラスターには適用できません。
以前使用され、削除された id
の下に新規ボリュームを追加する場合、以前使用された PersistentVolumeClaims
が必ず削除されているよう確認する必要があります。
前提条件
- OpenShift クラスターが必要です。
- 稼働中の Cluster Operator が必要です。
- JBOD ストレージのある Kafka クラスター。
手順
Kafka
リソースのspec.kafka.storage.volumes
プロパティーを編集します。新しいボリュームをvolumes
アレイに追加します。たとえば、id が2
の新しいボリュームを追加します。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... storage: type: jbod volumes: - id: 0 type: persistent-claim size: 100Gi deleteClaim: false - id: 1 type: persistent-claim size: 100Gi deleteClaim: false - id: 2 type: persistent-claim size: 100Gi deleteClaim: false # ... zookeeper: # ...
リソースを作成または更新します。
oc apply
を使用してこれを行うことができます。oc apply -f your-file
- 新しいトピックを作成するか、既存のパーティションを新しいディスクに再度割り当てします。
その他のリソース
トピックの再割り当てに関する詳細は 「パーティションの再割り当て」 を参照してください。
3.1.3.6. JBOD ストレージからのボリュームの削除
この手順では、JBOD ストレージを使用するように設定されている Kafka クラスターからボリュームを削除する方法を説明します。この手順は、他のストレージタイプを使用するように設定されている Kafka クラスターには適用できません。JBOD ストレージには、常に 1 つのボリュームが含まれている必要があります。
データの損失を避けるには、ボリュームを削除する前にすべてのパーティションを移動する必要があります。
前提条件
- OpenShift クラスターが必要です。
- 稼働中の Cluster Operator が必要です。
- 複数のボリュームがある JBOD ストレージのある Kafka クラスター。
手順
- 削除するディスクからすべてのパーティションを再度割り当てます。削除するディスクに割り当てられたままになっているパーティションのデータは削除される可能性があります。
Kafka
リソースのspec.kafka.storage.volumes
プロパティーを編集します。volumes
アレイから 1 つまたは複数のボリュームを削除します。たとえば、ID が1
と2
のボリュームを削除します。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... storage: type: jbod volumes: - id: 0 type: persistent-claim size: 100Gi deleteClaim: false # ... zookeeper: # ...
リソースを作成または更新します。
oc apply
を使用してこれを行うことができます。oc apply -f your-file
その他のリソース
トピックの再割り当てに関する詳細は 「パーティションの再割り当て」 を参照してください。
3.1.4. Kafka ブローカーレプリカ
Kafka クラスターは多くのブローカーを使って実行できます。Kafka.spec.kafka.replicas
の Kafka クラスターに使用されるブローカーの数を設定できます。クラスターに最適なブローカー数は、特定のユースケースに基づいて決定する必要があります。
3.1.4.1. ブローカーノード数の設定
この手順では、新規クラスターの Kafka ブローカーノードの数を設定する方法を説明します。これは、パーティションのない新しいクラスターのみに適用できます。クラスターにトピックがすでに定義されている場合は、「クラスターのスケーリング」を参照してください。
前提条件
- OpenShift クラスターが必要です。
- 稼働中の Cluster Operator が必要です。
- トピックが定義されていない Kafka クラスター。
手順
Kafka
リソースのreplicas
プロパティーを編集します。以下に例を示します。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... replicas: 3 # ... zookeeper: # ...
リソースを作成または更新します。
oc apply
を使用してこれを行うことができます。oc apply -f your-file
その他のリソース
クラスターにトピックがすでに定義されている場合は、「クラスターのスケーリング」を参照してください。
3.1.5. Kafka ブローカーの設定
AMQ Streams では、Kafka クラスターの Kafka ブローカーの設定をカスタマイズできます。Apache Kafka ドキュメント の「Broker Configs」セクションに記載されているほとんどのオプションを指定および設定できます。以下に関係する設定オプションは設定できません。
- セキュリティー (暗号化、認証、および承認)
- リスナーの設定
- Broker ID の設定
- ログデータディレクトリーの設定
- ブローカー間の通信
- ZooKeeper の接続
これらのオプションは AMQ Streams によって自動的に設定されます。
3.1.5.1. Kafka ブローカーの設定
Kafka.spec.kafka
の config
プロパティーには Kafka ブローカー設定オプションがキーとして含まれ、それらの値は以下の JSON タイプの 1 つになります。
- 文字列
- Number
- ブール値
AMQ Streams によって直接管理されるオプション以外は、Apache Kafka ドキュメント の「Broker Configs」セクションにあるすべてのオプションを指定および設定できます。以下の文字列の 1 つと同じキーまたは以下の文字列の 1 つで始まるキーを持つ設定オプションはすべて変更できません。
-
listeners
-
advertised.
-
broker.
-
listener.
-
host.name
-
port
-
inter.broker.listener.name
-
sasl.
-
ssl.
-
security.
-
password.
-
principal.builder.class
-
log.dir
-
zookeeper.connect
-
zookeeper.set.acl
-
authorizer.
-
super.user
制限されたオプションが config
プロパティーに指定された場合、そのオプションは無視され、Cluster Operator のログファイルに警告メッセージが出力されます。サポートされるその他すべてのオプションは Kafka に渡されます。
Kafka ブローカー設定の例
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... config: num.partitions: 1 num.recovery.threads.per.data.dir: 1 default.replication.factor: 3 offsets.topic.replication.factor: 3 transaction.state.log.replication.factor: 3 transaction.state.log.min.isr: 1 log.retention.hours: 168 log.segment.bytes: 1073741824 log.retention.check.interval.ms: 300000 num.network.threads: 3 num.io.threads: 8 socket.send.buffer.bytes: 102400 socket.receive.buffer.bytes: 102400 socket.request.max.bytes: 104857600 group.initial.rebalance.delay.ms: 0 # ...
3.1.5.2. Kafka ブローカーの設定
既存の Kafka ブローカーを設定するか、指定した設定で新しい Kafka ブローカーを作成します。
前提条件
- OpenShift クラスターが利用できる必要があります。
- Cluster Operator が稼働している必要があります。
手順
-
クラスターデプロイメントを指定する
Kafka
リソースが含まれる YAML 設定ファイルを開きます。 Kafka
リソースのspec.kafka.config
プロパティーで、Kafka 設定を 1 つまたは複数入力します。以下に例を示します。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: kafka: # ... config: default.replication.factor: 3 offsets.topic.replication.factor: 3 transaction.state.log.replication.factor: 3 transaction.state.log.min.isr: 1 # ... zookeeper: # ...
新しい設定を適用してリソースを作成または更新します。
oc apply
を使用します。oc apply -f kafka.yaml
kafka.yaml
は、設定するリソースの YAML 設定ファイルに置き換えます (例:kafka-persistent.yaml
)。
3.1.6. Kafka ブローカーリスナー
Kafka ブローカーで有効なリスナーを設定できます。以下のタイプのリスナーがサポートされます。
- ポート 9092 のプレーンリスナー (TLS による暗号化なし)
- ポート 9093 の TLS リスナー (TLS による暗号化を使用)
- OpenShift の外部からアクセスするためのポート 9094 の外部リスナー
OAuth 2.0
OAuth 2.0 トークンベースの認証を使用している場合、承認サーバーに接続するようにリスナーを設定できます。詳細は、「OAuth 2.0 トークンベース認証の使用」を参照してください。
リスナー証明書
TLS による暗号化が有効になっている TLS または外部リスナーの Kafka リスナー証明書 と呼ばれる独自のサーバー証明書を提供できます。詳細は 「Kafka リスナー証明書」 を参照してください。
3.1.6.1. Kafka リスナー
Kafka.spec.kafka
リソースの listeners
プロパティーを使用して Kafka ブローカーリスナーを設定できます。listeners
プロパティーには 3 つのサブプロパティーが含まれます。
-
plain
-
tls
-
external
各リスナーは、listeners
オブジェクトに指定のプロパティーがある場合にのみ定義されます。
すべてのリスナーが有効な listeners
プロパティーの例
# ... listeners: plain: {} tls: {} external: type: loadbalancer # ...
プレーンリスナーのみが有効な listeners
プロパティーの例
# ... listeners: plain: {} # ...
3.1.6.2. Kafka リスナーの設定
前提条件
- OpenShift クラスターが必要です。
- 稼働中の Cluster Operator が必要です。
手順
Kafka.spec.kafka
リソースのlisteners
プロパティーを編集します。認証のないプレーン (暗号化されていない) リスナーの設定例:
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: kafka: # ... listeners: plain: {} # ... zookeeper: # ...
リソースを作成または更新します。
oc apply
を使用してこれを行うことができます。oc apply -f your-file
その他のリソース
-
スキーマの詳細は、「
KafkaListeners
スキーマ参照」を参照してください。
3.1.6.3. リスナー認証
リスナーの authentication
プロパティーは、そのリスナーに固有の認証メカニズムを指定するために使用されます。
- 相互 TLS 認証 (TLS による暗号化のリスナーのみ)
- SCRAM-SHA 認証
authentication
プロパティーが指定されていない場合、リスナーはそのリスナー経由で接続するクライアントを認証しません。
認証は、User Operator を使用して KafkaUsers
を管理する場合に設定する必要があります。
3.1.6.3.1. リスナーの認証設定
以下の例で指定されるものは次のとおりです。
-
SCRAM-SHA 認証に設定された
plain
リスナー -
相互 TLS 認証を使用する
tls
リスナー -
相互 TLS 認証を使用する
external
リスナー
リスナー認証設定の例
# ... listeners: plain: authentication: type: scram-sha-512 tls: authentication: type: tls external: type: loadbalancer tls: true authentication: type: tls # ...
3.1.6.3.2. 相互 TLS 認証
相互 TLS 認証は、Kafka ブローカーと ZooKeeper Pod 間の通信で常に使用されます。
相互認証または双方向認証は、サーバーとクライアントの両方が証明書を提示するときに使用されます。AMQ Streams では、Kafka が TLS (Transport Layer Security) を使用して、相互認証の有無を問わず、Kafka ブローカーとクライアントとの間で暗号化された通信が行われるよう設定できます。相互認証を設定する場合、ブローカーによってクライアントが認証され、クライアントによってブローカーが認証されます。
TLS 認証は一般的には一方向で、一方が他方のアイデンティティーを認証します。たとえば、Web ブラウザーと Web サーバーの間で HTTPS が使用される場合、サーバーはブラウザーのアイデンティティーの証明を取得します。
3.1.6.3.2.1. クライアントに相互 TLS 認証を使用する場合
以下の場合、Kafka クライアントの認証に相互 TLS 認証が推奨されます。
- 相互 TLS 認証を使用した認証がクライアントでサポートされる場合。
- パスワードの代わりに TLS 証明書を使用する必要がある場合。
- 期限切れの証明書を使用しないように、クライアントアプリケーションを定期的に再設定および再起動できる場合。
3.1.6.3.3. SCRAM-SHA 認証
SCRAM (Salted Challenge Response Authentication Mechanism) は、パスワードを使用して相互認証を確立できる認証プロトコルです。AMQ Streams では、Kafka が SASL (Simple Authentication and Security Layer) SCRAM-SHA-512 を使用するよう設定し、暗号化されていないクライアントの接続と TLS で暗号化されたクライアントの接続の両方で認証を提供できます。TLS 認証は、Kafka ブローカーと ZooKeeper ノードの間で常に内部で使用されます。TLS クライアント接続で TLS プロトコルを使用すると、接続が暗号化されますが、認証には使用されません。
SCRAM の以下のプロパティーは、暗号化されていない接続でも SCRAM-SHA を安全に使用できるようにします。
- 通信チャネル上では、パスワードはクリアテキストで送信されません。代わりに、クライアントとサーバーはお互いにチャレンジを生成し、認証するユーザーのパスワードを認識していることを証明します。
- サーバーとクライアントは、認証を交換するたびに新しいチャレンジを生成します。よって、交換はリレー攻撃に対して回復性を備えています。
3.1.6.3.3.1. サポートされる SCRAM クレデンシャル
AMQ Streams では SCRAM-SHA-512 のみがサポートされます。KafkaUser.spec.authentication.type
を scram-sha-512
に設定すると、User Operator によって、大文字と小文字の ASCII 文字と数字で構成された無作為の 12 文字のパスワードが生成されます。
3.1.6.3.3.2. クライアントに SCRAM-SHA 認証を使用する場合
以下の場合、Kafka クライアントの認証に SCRAM-SHA が推奨されます。
- SCRAM-SHA-512 を使用した認証がクライアントでサポートされる場合。
- TLS 証明書の代わりにパスワードを使用する必要がある場合。
- 暗号化されていない通信に認証が必要な場合。
3.1.6.4. 外部リスナー
外部リスナーを使用して AMQ Streams の Kafka クラスターを OpenShift 環境外のクライアントに公開します。
その他のリソース
3.1.6.4.1. 外部リスナーでアドバタイズされたアドレスのカスタマイズ
デフォルトでは、AMQ Streams は Kafka クラスターがそのクライアントにアドバタイズするホスト名とポートを自動的に決定しようとします。AMQ Streams が稼働しているインフラストラクチャーでは Kafka にアクセスできる正しいホスト名やポートを提供しない可能性があるため、デフォルトの動作はすべての状況に適しているわけではありません。外部リスナーの overrides
プロパティーで、アドバタイズされたホスト名およびポートをカスタマイズできます。その後、TLS ホスト名の検証で使用できるようにするため、AMQ Streams では Kafka ブローカーでアドバタイズされたアドレスが自動的に設定され、ブローカー証明書に追加されます。アドバタイズされたホストおよびポートのオーバーライドは、すべてのタイプの外部リスナーで利用できます。
アドバタイズされたアドレスのオーバーライドが設定された外部リスナーの例
# ... listeners: external: type: route authentication: type: tls overrides: brokers: - broker: 0 advertisedHost: example.hostname.0 advertisedPort: 12340 - broker: 1 advertisedHost: example.hostname.1 advertisedPort: 12341 - broker: 2 advertisedHost: example.hostname.2 advertisedPort: 12342 # ...
さらに、ブートストラップサービスの名前を指定することもできます。この名前はブローカー証明書に追加され、TLS ホスト名の検証に使用できます。すべてのタイプの外部リスナーで、ブートストラップアドレスを追加できます。
追加のブートストラップアドレスが設定された外部リスナーの例
# ... listeners: external: type: route authentication: type: tls overrides: bootstrap: address: example.hostname # ...
3.1.6.4.2. ルート外部リスナー
タイプ route
の外部リスナーは、OpenShift のRoutes
および HAProxy ルーターを使用して Kafka を公開します。
route
は OpenShift でのみサポートされます。
3.1.6.4.2.1. OpenShift Routes
を使用した Kafka の公開
OpenShift Routes
および HAProxy ルーターを使用して Kafka を公開する場合、各 Kafka ブローカー Pod に専用の Route
が作成されます。追加の Route
が作成され、Kafka ブートストラップアドレスとして提供されます。これらの Routes
を使用すると、Kafka クライアントを 443 番ポートで Kafka に接続することができます。
TLS による暗号化は常に Routes
と使用されます。
デフォルトでは、ルートホストは OpenShift によって自動的に割り当てられます。ただし、 overrides
プロパティーに要求されたホストを指定すると、割り当てられたルートをオーバーライドすることができます。AMQ Streams では、要求されたホストが利用可能であるか検証されません。そのため、ホストが利用可能であることをユーザーが確認する必要があります。
OpenShift ルートホストのオーバーライドが設定されたタイプ routes
の外部リスナーの例
# ... listeners: external: type: route authentication: type: tls overrides: bootstrap: host: bootstrap.myrouter.com brokers: - broker: 0 host: broker-0.myrouter.com - broker: 1 host: broker-1.myrouter.com - broker: 2 host: broker-2.myrouter.com # ...
Routes
を使用した Kafka へのアクセスに関する詳細は 「OpenShift ルートを使用した Kafka へのアクセス」 を参照してください。
3.1.6.4.2.2. OpenShift ルートを使用した Kafka へのアクセス
前提条件
- OpenShift クラスターが必要です。
- 稼働中の Cluster Operator が必要です。
手順
外部リスナーが有効で、タイプ
route
に設定されている Kafka クラスターをデプロイします。Routes
を使用するよう設定された外部リスナーがある設定の例apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: kafka: # ... listeners: external: type: route # ... # ... zookeeper: # ...
リソースを作成または更新します。
oc apply -f your-file
ブートストラップ
Route
のアドレスを見つけます。oc get routes _cluster-name_-kafka-bootstrap -o=jsonpath='{.status.ingress[0].host}{"\n"}'
このアドレスと Kafkaクライアントの 443 番ポートをブートストラップアドレスとして使用します。
ブローカーの認証局の公開証明書を取得します。
oc get secret _<cluster-name>_-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 -d > ca.crt
Kafka クライアントで取得した証明書を使用して TLS 接続を設定します。認証が有効になっている場合は、SASL または TLS 認証を設定する必要もあります。
その他のリソース
-
スキーマの詳細は、「
KafkaListeners
スキーマ参照」を参照してください。
3.1.6.4.3. ロードバランサー外部リスナー
タイプが loadbalancer
の外部リスナーは、Loadbalancer
タイプの Services
を使用して、Kafka を公開します。
3.1.6.4.3.1. ロードバランサーを使用した Kafka の公開
Loadbalancer
タイプの Services
を使用して Kafka を公開すると、Kafka ブローカー Pod ごとに新しいロードバランサーサービスが作成されます。追加のロードバランサーが作成され、Kafkaの ブートストラップ アドレスとして提供されます。ロードバランサーは 9094 番ポートで接続をリッスンします。
デフォルトでは、TLS による暗号化は有効になっています。これを無効にするには、tls
フィールドを false
に設定します。
タイプが loadbalancer
の外部リスナーの例
# ... listeners: external: type: loadbalancer authentication: type: tls # ...
ロードバランサーを使用した Kafka へのアクセスに関する詳細は 「ロードバランサーを使用した Kafka へのアクセス」 を参照してください。
3.1.6.4.3.2. 外部ロードバランサーリスナーの DNS 名のカスタマイズ
loadbalancer
リスナーでは、dnsAnnotations
プロパティーを使用して追加のアノテーションをロードバランサーサービスに追加できます。これらのアノテーションを使用すると、自動的に DNS 名をロードバランサーサービスに割り当てる ExternalDNS などの DNS ツールをインストルメント化できます。
dnsAnnotations
を使用するタイプ loadbalancer
の外部リスナーの例
# ... listeners: external: type: loadbalancer authentication: type: tls overrides: bootstrap: dnsAnnotations: external-dns.alpha.kubernetes.io/hostname: kafka-bootstrap.mydomain.com. external-dns.alpha.kubernetes.io/ttl: "60" brokers: - broker: 0 dnsAnnotations: external-dns.alpha.kubernetes.io/hostname: kafka-broker-0.mydomain.com. external-dns.alpha.kubernetes.io/ttl: "60" - broker: 1 dnsAnnotations: external-dns.alpha.kubernetes.io/hostname: kafka-broker-1.mydomain.com. external-dns.alpha.kubernetes.io/ttl: "60" - broker: 2 dnsAnnotations: external-dns.alpha.kubernetes.io/hostname: kafka-broker-2.mydomain.com. external-dns.alpha.kubernetes.io/ttl: "60" # ...
3.1.6.4.3.3. ロードバランサー IP アドレスのカスタマイズ
loadbalancer
リスナーで、ロードバランサーの作成時に loadBalancerIP
プロパティーを使用すると、特定の IP アドレスをリクエストできます。特定の IP アドレスでロードバランサーを使用する必要がある場合は、このプロパティーを使用します。クラウドプロバイダーがこの機能に対応していない場合、loadBalancerIP
フィールドは無視されます。
特定のロードバランサー IP アドレスリクエストのある loadbalancer
タイプの外部リスナーの例
# ... listeners: external: type: loadbalancer authentication: type: tls overrides: bootstrap: loadBalancerIP: 172.29.3.10 brokers: - broker: 0 loadBalancerIP: 172.29.3.1 - broker: 1 loadBalancerIP: 172.29.3.2 - broker: 2 loadBalancerIP: 172.29.3.3 # ...
3.1.6.4.3.4. ロードバランサーを使用した Kafka へのアクセス
前提条件
- OpenShift クラスターが必要です。
- 稼働中の Cluster Operator が必要です。
手順
外部リスナーが有効で、タイプ
loadbalancer
に設定されている Kafka クラスターをデプロイします。ロードバランサーを使用するよう設定された外部リスナーがある設定の例
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: kafka: # ... listeners: external: type: loadbalancer authentication: type: tls # ... # ... zookeeper: # ...
リソースを作成または更新します。
oc apply
を使用してこれを行うことができます。oc apply -f your-file
ブートストラップロードバランサーのホスト名を見つけます。
oc get
を使用してこれを行うことができます。oc get service cluster-name-kafka-external-bootstrap -o=jsonpath='{.status.loadBalancer.ingress[0].hostname}{"\n"}'
ホスト名が見つからない場合 (コマンドによって返されなかった場合)、ロードバランサーの IP アドレスを使用します。
oc get
を使用してこれを行うことができます。oc get service cluster-name-kafka-external-bootstrap -o=jsonpath='{.status.loadBalancer.ingress[0].ip}{"\n"}'
ホスト名または IP アドレスと Kafkaクライアントの 9094 番ポートをブートストラップアドレスとして使用します。
TLS による暗号化が無効になっている場合を除き、ブローカーの認証局の公開証明書を取得します。
oc get
を使用してこれを行うことができます。oc get secret cluster-name-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 -d > ca.crt
Kafka クライアントで取得した証明書を使用して TLS 接続を設定します。認証が有効になっている場合は、SASL または TLS 認証を設定する必要もあります。
その他のリソース
-
スキーマの詳細は、「
KafkaListeners
スキーマ参照」を参照してください。
3.1.6.4.4. ノードポートの外部リスナー
タイプが nodeport
の外部リスナーは、NodePort
タイプの Services
を使用して、Kafka を公開します。
3.1.6.4.4.1. ノードポートを使用した Kafka の公開
NodePort
タイプの Services
を使用して Kafka を公開する場合、Kafka クライアントは OpenShift のノードに直接接続されます。各クライアントの OpenShift ノード上のポートへのアクセスを有効にする必要があります (ファイアウォール、セキュリティーグループなど)。各 Kafka ブローカー Pod が別々のポートでアクセス可能になります。追加の NodePort
型 Service
が作成され、Kafka ブートストラップアドレスとして提供されます。
Kafka ブローカー Pod にアドバタイズされたアドレスを設定する場合、AMQ Stremas では該当の Pod が稼働しているノードのアドレスが使用されます。ノードアドレスを選択する場合、以下の優先順位で異なるアドレスタイプが使用されます。
- ExternalDNS
- ExternalIP
- Hostname
- InternalDNS
- InternalIP
デフォルトでは、TLS による暗号化は有効になっています。これを無効にするには、tls
フィールドを false
に設定します。
ノードポートを使用して Kafka クラスターを公開する場合、現在 TLS ホスト名の検証はサポートされません。
デフォルトでは、ブートストラップおよびブローカーサービスに使用されるポート番号は OpenShift によって自動的に割り当てられます。ただし、overrides
プロパティーに要求されたポート番号を指定すると、割り当てられたノードポートをオーバーライドすることができます。AMQ Streams では、要求されたポートで検証を実行しません。そのため、ポートが使用可能であることをユーザーが確認する必要があります。
ノードポートのオーバーライドが設定された外部リスナーの例
# ... listeners: external: type: nodeport tls: true authentication: type: tls overrides: bootstrap: nodePort: 32100 brokers: - broker: 0 nodePort: 32000 - broker: 1 nodePort: 32001 - broker: 2 nodePort: 32002 # ...
ノードポートを使用した Kafka へのアクセスに関する詳細は 「ノードポートを使用した Kafka へのアクセス」 を参照してください。
3.1.6.4.4.2. 外部ノードポートリスナーの DNS 名のカスタマイズ
nodeport
リスナーでは、dnsAnnotations
プロパティーを使用して追加のアノテーションをノードポートサービスに追加できます。これらのアノテーションを使用すると、自動的に DNS 名をクラスターノードに割り当てる ExternalDNS などの DNS ツールをインストルメント化できます。
dnsAnnotations
を使用するタイプ nodeport
の外部リスナーの例
# ... listeners: external: type: nodeport tls: true authentication: type: tls overrides: bootstrap: dnsAnnotations: external-dns.alpha.kubernetes.io/hostname: kafka-bootstrap.mydomain.com. external-dns.alpha.kubernetes.io/ttl: "60" brokers: - broker: 0 dnsAnnotations: external-dns.alpha.kubernetes.io/hostname: kafka-broker-0.mydomain.com. external-dns.alpha.kubernetes.io/ttl: "60" - broker: 1 dnsAnnotations: external-dns.alpha.kubernetes.io/hostname: kafka-broker-1.mydomain.com. external-dns.alpha.kubernetes.io/ttl: "60" - broker: 2 dnsAnnotations: external-dns.alpha.kubernetes.io/hostname: kafka-broker-2.mydomain.com. external-dns.alpha.kubernetes.io/ttl: "60" # ...
3.1.6.4.4.3. ノードポートを使用した Kafka へのアクセス
前提条件
- OpenShift クラスターが必要です。
- 稼働中の Cluster Operator が必要です。
手順
外部リスナーが有効で、タイプ
nodeport
に設定されている Kafka クラスターをデプロイします。ノードポートを使用するよう設定された外部リスナーがある設定の例
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: kafka: # ... listeners: external: type: nodeport tls: true # ... # ... zookeeper: # ...
リソースを作成または更新します。
oc apply
を使用してこれを行うことができます。oc apply -f your-file
ブートストラップサービスのポート番号を見つけます。
oc get
を使用してこれを行うことができます。oc get service cluster-name-kafka-external-bootstrap -o=jsonpath='{.spec.ports[0].nodePort}{"\n"}'
Kafka ブートストラップアドレスでポートを使用する必要があります。
OpenShift ノードのアドレスを見つけます。
oc get
を使用してこれを行うことができます。oc get node node-name -o=jsonpath='{range .status.addresses[*]}{.type}{"\t"}{.address}{"\n"}'
異なるアドレスが返される場合は、以下の順序を基にしてアドレスタイプを選択します。
- ExternalDNS
- ExternalIP
- Hostname
- InternalDNS
InternalIP
アドレスと前述のステップで見つけたポートを、Kafka ブートストラップアドレスで使用します。
TLS による暗号化が無効になっている場合を除き、ブローカーの認証局の公開証明書を取得します。
oc get
を使用してこれを行うことができます。oc get secret cluster-name-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 -d > ca.crt
Kafka クライアントで取得した証明書を使用して TLS 接続を設定します。認証が有効になっている場合は、SASL または TLS 認証を設定する必要もあります。
その他のリソース
-
スキーマの詳細は、「
KafkaListeners
スキーマ参照」を参照してください。
3.1.6.4.5. OpenShift Ingress 外部リスナー
タイプが ingress
の外部リスナーは、Kubernetes Ingress
と NGINX Ingress Controller for Kubernetes を使用して Kafka を公開します。
3.1.6.4.5.1. Kubernetes Ingress
を使用した Kafka の公開
Kubernetes Ingress
と NGINX Ingress Controller for Kubernetes を使用して Kafka が公開されると、Kafka ブローカー Pod ごとに専用の Ingress
リソースが作成されます。追加の Ingress
リソースが作成され、Kafka ブートストラップアドレスとして提供されます。これらの Ingress
リソースを使用すると、Kafka クライアントを 443 番ポートで Kafka に接続することができます。
Ingress
を使用する外部リスナーは、現在 NGINX Ingress Controller for Kubernetes でのみテストされています。
AMQ Streams では、NGINX Ingress Controller for Kubernetes の TLS パススルー機能が使用されます。TLS パススルーが NGINX Ingress Controller for Kubernetes デプロイメントで有効になっていることを確認してください。TLS パススルーの有効化に関する詳細は、TLS パススルーのドキュメント を参照してください。Ingress
を使用して Kafka を公開する場合、TLS パススルー機能を使用するため、TLS による暗号化を無効にできません。
Ingress コントローラーはホスト名を自動的に割り当てません。spec.kafka.listeners.external.configuration
セクションに、ブートストラップおよびブローカーごとのサービスによって使用されるホスト名を指定する必要があります。また、確実にホスト名が Ingress エンドポイントに解決することを確認する必要があります。AMQ Streams では、要求されたホストが利用可能で、適切に Ingress エンドポイントにルーティングされることを検証しません。
タイプが ingress
の外部リスナーの例
# ... listeners: external: type: ingress authentication: type: tls configuration: bootstrap: host: bootstrap.myingress.com brokers: - broker: 0 host: broker-0.myingress.com - broker: 1 host: broker-1.myingress.com - broker: 2 host: broker-2.myingress.com # ...
Ingress
を使用した Kafka へのアクセスに関する詳細は 「ingress を使用した Kafka へのアクセス」 を参照してください。
3.1.6.4.5.2. Ingress
クラスの設定
デフォルトでは、Ingress
クラスは nginx
に設定されます。class
プロパティーを使用して Ingress
クラスを変更できます。
Ingress
クラス nginx-internal
を使用するタイプ ingress
の外部リスナーの例
# ... listeners: external: type: ingress class: nginx-internal # ... # ...
3.1.6.4.5.3. 外部 ingress リスナーの DNS 名のカスタマイズ
ingress
リスナーでは、dnsAnnotations
プロパティーを使用して追加のアノテーションを ingress リソースに追加できます。これらのアノテーションを使用すると、自動的に DNS 名を ingress リソースに割り当てる ExternalDNS などの DNS ツールをインストルメント化できます。
dnsAnnotations
を使用するタイプ ingress
の外部リスナーの例
# ... listeners: external: type: ingress authentication: type: tls configuration: bootstrap: dnsAnnotations: external-dns.alpha.kubernetes.io/hostname: bootstrap.myingress.com. external-dns.alpha.kubernetes.io/ttl: "60" host: bootstrap.myingress.com brokers: - broker: 0 dnsAnnotations: external-dns.alpha.kubernetes.io/hostname: broker-0.myingress.com. external-dns.alpha.kubernetes.io/ttl: "60" host: broker-0.myingress.com - broker: 1 dnsAnnotations: external-dns.alpha.kubernetes.io/hostname: broker-1.myingress.com. external-dns.alpha.kubernetes.io/ttl: "60" host: broker-1.myingress.com - broker: 2 dnsAnnotations: external-dns.alpha.kubernetes.io/hostname: broker-2.myingress.com. external-dns.alpha.kubernetes.io/ttl: "60" host: broker-2.myingress.com # ...
3.1.6.4.5.4. ingress を使用した Kafka へのアクセス
以下の手順では、Ingress を使用して OpenShift の外部から AMQ Streams Kafka クラスターにアクセスする方法を説明します。
前提条件
- OpenShift クラスター。
- TLS パススルーが有効になっている、デプロイ済みの NGINX Ingress Controller for Kubernetes。
- 稼働中の Cluster Operator が必要です。
手順
外部リスナーが有効で、タイプ
ingress
に設定されている Kafka クラスターをデプロイします。Ingress
を使用するよう設定された外部リスナーがある設定の例apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: kafka: # ... listeners: external: type: ingress authentication: type: tls configuration: bootstrap: host: bootstrap.myingress.com brokers: - broker: 0 host: broker-0.myingress.com - broker: 1 host: broker-1.myingress.com - broker: 2 host: broker-2.myingress.com # ... zookeeper: # ...
-
configuration
セクションのホストが適切に Ingress エンドポイントに解決することを確認してください。 リソースを作成または更新します。
oc apply -f your-file
ブローカーの認証局の公開証明書を取得します。
oc get secret cluster-name-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 -d > ca.crt
- Kafka クライアントで取得した証明書を使用して TLS 接続を設定します。認証が有効になっている場合は、SASL または TLS 認証を設定する必要もあります。443 番ポートで、クライアントを設定で指定したホストに接続します。
その他のリソース
-
スキーマの詳細は、「
KafkaListeners
スキーマ参照」を参照してください。
3.1.6.5. ネットワークポリシー
AMQ Streams では、Kafka ブローカーで有効になっているリスナーごとに NetworkPolicy
リソースが自動的に作成されます。デフォルトでは、すべてのアプリケーションと namespace にアクセスする権限が NetworkPolicy
によってリスナーに付与されます。
ネットワークレベルでのリスナーへのアクセスを指定のアプリケーションまたは namespace のみに制限するには、networkPolicyPeers
フィールドを使用します。
認証および承認と合わせてネットワークポリシーを使用します。
リスナーごとに、異なる networkPolicyPeers
設定を指定できます。
3.1.6.5.1. リスナーのネットワークポリシー設定
以下に、plain
および tls
リスナーの networkPolicyPeers
設定の例を示します。
# ... listeners: plain: authentication: type: scram-sha-512 networkPolicyPeers: - podSelector: matchLabels: app: kafka-sasl-consumer - podSelector: matchLabels: app: kafka-sasl-producer tls: authentication: type: tls networkPolicyPeers: - namespaceSelector: matchLabels: project: myproject - namespaceSelector: matchLabels: project: myproject2 # ...
この例では以下が設定されています。
-
ラベル
app: kafka-sasl-consumer
およびapp: kafka-sasl-producer
と一致するアプリケーション Pod のみがplain
リスナーに接続できます。アプリケーション Pod は Kafka ブローカーと同じ namespace で実行されている必要があります。 -
ラベル
project: myproject
およびproject: myproject2
と一致する namespace で稼働するアプリケーション Pod のみがtls
リスナーに接続できます。
networkPolicyPeers
フィールドの構文は、NetworkPolicy
リソースの from
フィールドと同じです。スキーマの詳細は、「NetworkPolicyPeer API reference」および「KafkaListeners
スキーマ参照」を参照してください。
AMQ Streams でネットワークポリシーを使用するには、ingress NetworkPolicies が OpenShift の設定でサポートされる必要があります。
3.1.6.5.2. networkPolicyPeers
を使用した Kafka リスナーへのアクセス制限
networkPolicyPeers
フィールドを使用すると、リスナーへのアクセスを指定のアプリケーションのみに制限できます。
前提条件
- Ingress NetworkPolicies をサポートする OpenShift クラスター。
- Cluster Operator が稼働している必要があります。
手順
-
Kafka
リソースを開きます。 networkPolicyPeers
フィールドで、Kafka クラスターへのアクセスが許可されるアプリケーション Pod または namespace を定義します。以下は、ラベル
app
がkafka-client
に設定されているアプリケーションからの接続のみを許可するようtls
リスナーを設定する例になります。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: kafka: # ... listeners: tls: networkPolicyPeers: - podSelector: matchLabels: app: kafka-client # ... zookeeper: # ...
リソースを作成または更新します。
oc apply
を使用します。oc apply -f your-file
その他のリソース
-
スキーマの詳細は、「NetworkPolicyPeer API reference」および「
KafkaListeners
スキーマ参照」を参照してください。
3.1.7. 認証および承認
AMQ Streams では認証および承認がサポートされます。認証は、リスナーごとに独立して設定できます。承認は、常に Kafka クラスター全体に対して設定されます。
3.1.7.1. 認証
認証は、authentication
プロパティーの リスナー設定 の一部として設定されます。認証メカニズムは type
フィールドで定義されます。
authentication
プロパティーがない場合、指定のリスナーで認証が有効になりません。認証がないと、リスナーではすべての接続が許可されます。
サポートされる認証メカニズム
- TLS クライアント認証
- SASL SCRAM-SHA-512
- OAuth 2.0 のトークンベースの認証
3.1.7.1.1. TLS クライアント認証
TLS クライアント認証は、type
を tls
に指定して有効にします。TLS クライアント認証は tls
リスナーでのみサポートされます。
タイプ tls
の authentication
の例
# ... authentication: type: tls # ...
3.1.7.2. Kafka ブローカーでの認証の設定
前提条件
- OpenShift クラスターが利用できる必要があります。
- Cluster Operator が稼働している必要があります。
手順
-
クラスターデプロイメントを指定する
Kafka
リソースが含まれる YAML 設定ファイルを開きます。 Kafka
リソースのspec.kafka.listeners
プロパティーで、認証を有効にするリスナーにauthentication
フィールドを追加します。以下に例を示します。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: kafka: # ... listeners: tls: authentication: type: tls # ... zookeeper: # ...
新しい設定を適用してリソースを作成または更新します。
oc apply
を使用します。oc apply -f kafka.yaml
kafka.yaml
は、設定するリソースの YAML 設定ファイルに置き換えます (例:kafka-persistent.yaml
)。
その他のリソース
- サポートされる認証メカニズムの詳細は、「認証」を参照してください。
-
Kafka
のスキーマに関する詳細は、「Kafka
スキーマ参照」を参照してください。
3.1.7.3. 承認
Kafka.spec.kafka
リソースの authorization
プロパティーを使用して Kafka ブローカーの承認を設定できます。authorization
プロパティーがないと、承認が有効になりません。承認を有効にすると、承認は有効なすべての リスナー に適用されます。承認方法は type
フィールドで定義されます。
以下を設定できます。
- 簡易承認
- OAuth 2.0 での承認 (OAuth 2.0 トークンベースの認証を使用している場合)
3.1.7.3.1. 簡易承認
AMQ Streams の簡易承認では、SimpleAclAuthorizer
が使用されます。これは、Apache Kafka で提供されるデフォルトのアクセス制御リスト (ACL) 承認プラグインです。ACL を使用すると、ユーザーがアクセスできるリソースを細かく定義できます。簡易承認を有効にするには、type
フィールドを simple
に設定します。
簡易承認の例
# ... authorization: type: simple # ...
ユーザーのアクセスルールは、アクセス制御リスト (ACL) を使用して定義されます。必要に応じて、superUsers
フィールドにスーパーユーザーのリストを指定できます。
3.1.7.3.2. スーパーユーザー
スーパーユーザーは、ACL で定義されたアクセス制限に関係なく、Kafka クラスターのすべてのリソースにアクセスできます。Kafka クラスターのスーパーユーザーを指定するには、superUsers
フィールドにユーザープリンシパルのリストを入力します。ユーザーが TLS クライアント認証を使用する場合、ユーザー名は CN=
で始まる証明書のサブジェクトの共通名になります。
スーパーユーザーの指定例
# ... authorization: type: simple superUsers: - CN=fred - sam - CN=edward # ...
Kafka.spec.kafka
の config
プロパティーにある super.user
設定オプションは無視されます。この代わりに、authorization
プロパティーでスーパーユーザーを指定します。詳細は「Kafka ブローカーの設定」を参照してください。
3.1.7.4. Kafka ブローカーでの承認の設定
承認を設定し、特定の Kafka ブローカーのスーパーユーザーを指定します。
前提条件
- OpenShift クラスター。
- Cluster Operator が稼働している必要があります。
手順
Kafka.spec.kafka
リソースでauthorization
プロパティーを追加または編集します。以下に例を示します。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: kafka: # ... authorization: type: simple superUsers: - CN=fred - sam - CN=edward # ... zookeeper: # ...
リソースを作成または更新します。
oc apply
を使用してこれを行うことができます。oc apply -f your-file
その他のリソース
- サポートされる承認方法の詳細は、「承認」を参照してください。
-
Kafka
のスキーマに関する詳細は、「Kafka
スキーマ参照」を参照してください。 - ユーザー認証の設定に関する詳細は、「Kafka User リソース」を参照してください。
3.1.8. ZooKeeper レプリカ
通常、ZooKeeper クラスターまたはアンサンブルは、一般的に 3、5、7 個の奇数個のノードで実行されます。
効果的なクォーラムを維持するには、過半数のノードが利用可能である必要があります。ZooKeeper クラスターでクォーラムを損失すると、クライアントへの応答が停止し、Kafka ブローカーが機能しなくなります。AMQ Streams では、 ZooKeeper クラスターの安定性および高可用性が重要になります。
- 3 ノードクラスター
- 3 ノードの ZooKeeper クラスターでは、クォーラムを維持するために、少なくとも 2 つのノードが稼働している必要があります。このクラスターは、利用できないノードが 1 つのみであれば対応できます。
- 5 ノードクラスター
- 5 ノードの ZooKeeper クラスターでは、クォーラムを維持するために、少なくとも 3 つのノードが稼働している必要があります。このクラスターは、利用できないノードが 2 つの場合まで対応できます。
- 7 ノードクラスター
- 7 ノードの ZooKeeper クラスターでは、クォーラムを維持するために、少なくとも 4 つのノードが稼働している必要があります。このクラスターは、利用できないノードが 3 つの場合まで対応できます。
開発の目的で、単一ノードの ZooKeeper を実行することも可能です。
クラスターのノードの数が多いほどクォーラムを維持するコストも高くなるため、ノードの数が多いほどパフォーマンスが向上するとは限りません。可用性の要件に応じて、使用するノードの数を決定します。
3.1.8.1. ZooKeeper ノードの数
ZooKeeper ノードの数は、Kafka.spec.zookeeper
の replicas
プロパティーを使用して設定できます。
レプリカの設定を示す例
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... zookeeper: # ... replicas: 3 # ...
3.1.8.2. ZooKeeper レプリカの数の変更
前提条件
- OpenShift クラスターが利用できる必要があります。
- Cluster Operator が稼働している必要があります。
手順
-
クラスターデプロイメントを指定する
Kafka
リソースが含まれる YAML 設定ファイルを開きます。 Kafka
リソースのspec.zookeeper.replicas
プロパティーで、複製された ZooKeeper サーバーの数を入力します。以下に例を示します。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... zookeeper: # ... replicas: 3 # ...
新しい設定を適用してリソースを作成または更新します。
oc apply
を使用します。oc apply -f kafka.yaml
kafka.yaml
は、設定するリソースの YAML 設定ファイルに置き換えます (例:kafka-persistent.yaml
)。
3.1.9. ZooKeeper の設定
AMQ Streams では、Apache ZooKeeper ノードの設定をカスタマイズできます。ZooKeeper のドキュメントに記載されているほとんどのオプションを指定および設定できます。
以下に関連するオプションは設定できません。
- セキュリティー (暗号化、認証、および承認)
- リスナーの設定
- データディレクトリーの設定
- ZooKeeper クラスターの構成
これらのオプションは AMQ Streams によって自動的に設定されます。
3.1.9.1. ZooKeeper の設定
ZooKeeper ノードは、Kafka.spec.zookeeper
の config
プロパティーを使用して設定されます。このプロパティーには、ZooKeeper 設定オプションがキーとして含まれます。値は、以下の JSON タイプの 1 つを使用して記述できます。
- 文字列
- Number
- ブール値
ユーザーは、AMQ Streams で直接管理されるオプションを除き、ZooKeeper ドキュメント に記載されているオプションを指定および設定できます。以下の文字列の 1 つと同じキーまたは以下の文字列の 1 つで始まるキーを持つ設定オプションはすべて禁止されています。
-
server.
-
dataDir
-
dataLogDir
-
clientPort
-
authProvider
-
quorum.auth
-
requireClientAuthScheme
禁止されているオプションの 1 つが config
プロパティーにある場合、そのオプションは無視され、警告メッセージが Cluster Operator ログファイルに出力されます。その他のオプションは、すべて ZooKeeper に渡されます。
Cluster Operator では、提供された config
オブジェクトのキーまたは値は検証されません。無効な設定を指定すると、ZooKeeper クラスターが起動しなかったり、不安定になる可能性があります。このような場合、Kafka.spec.zookeeper.config
オブジェクトの設定を修正し、Cluster Operator によって新しい設定がすべての ZooKeeper ノードにロールアウトされるようにします。
選択したオプションのデフォルト値は次のとおりです。
-
timeTick
、デフォルト値2000
-
initLimit
、デフォルト値5
-
syncLimit
、デフォルト値2
-
autopurge.purgeInterval
、デフォルト値1
これらのオプションは、Kafka.spec.zookeeper.config
プロパティーにない場合に自動的に設定されます。
ZooKeeper 設定を示す例
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: kafka: # ... zookeeper: # ... config: autopurge.snapRetainCount: 3 autopurge.purgeInterval: 1 # ...
3.1.9.2. ZooKeeper の設定
前提条件
- OpenShift クラスターが利用できる必要があります。
- Cluster Operator が稼働している必要があります。
手順
-
クラスターデプロイメントを指定する
Kafka
リソースが含まれる YAML 設定ファイルを開きます。 Kafka
リソースのspec.zookeeper.config
プロパティーで、ZooKeeper 設定を 1 つまたは複数入力します。以下に例を示します。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: kafka: # ... zookeeper: # ... config: autopurge.snapRetainCount: 3 autopurge.purgeInterval: 1 # ...
新しい設定を適用してリソースを作成または更新します。
oc apply
を使用します。oc apply -f kafka.yaml
kafka.yaml
は、設定するリソースの YAML 設定ファイルに置き換えます (例:kafka-persistent.yaml
)。
3.1.10. ZooKeeper の接続
ZooKeeper サービスは暗号化および認証でセキュア化され、AMQ Streams の一部でない外部アプリケーションでの使用は想定されていません。
しかし、kafka-topics
ツールなどの ZooKeeper への接続を必要とする Kafka CLI ツールを使用する場合は、Kafka コンテナー内でターミナルを使用し、localhost:2181
を ZooKeeper アドレスとして使用して、TLS トンネルのローカル側を ZooKeeper に接続できます。
3.1.10.1. ターミナルからの ZooKeeper への接続
Kafka コンテナー内でターミナルを開き、ZooKeeper の接続を必要とする Kafka CLI ツールを使用します。
前提条件
- OpenShift クラスターが利用できる必要があります。
- Kafka クラスターが稼働している必要があります。
- Cluster Operator が稼働している必要があります。
手順
OpenShift コンソールを使用してターミナルを開くか、CLI から
exec
コマンドを実行します。以下に例を示します。
oc exec -it my-cluster-kafka-0 -- bin/kafka-topics.sh --list --zookeeper localhost:2181
必ず
localhost:2181
を使用してください。ZooKeeper に対して Kafka コマンドを実行できるようになりました。
3.1.11. Entitiy Operator
Entity Operator は、実行中の Kafka クラスターで Kafka 関連のエンティティーを管理します。
Entity Operator は以下と構成されます。
- Kafka トピックを管理する Topic Operator
- Kafka ユーザーを管理する User Operator
Cluster Operator は Kafka
リソース設定を介して、Kafka クラスターのデプロイ時に、上記の Operator の 1 つまたは両方を含む Entity Operator をデプロイできます。
デプロイされると、デプロイメント設定に応じて、Entity Operator にオペレーターが含まれます。
これらのオペレーターは、Kafka クラスターのトピックおよびユーザーを管理するために自動的に設定されます。
3.1.11.1. Entity Operator の設定プロパティー
Entity Operator は Kafka.spec
の entityOperator
を使用して設定できます。
entityOperator
プロパティーでは複数のサブプロパティーがサポートされます。
-
tlsSidecar
-
topicOperator
-
userOperator
-
template
tlsSidecar
プロパティーは、ZooKeeper との通信に使用される TLS サイドカーコンテナーの設定に使用できます。TLS サイドカーコンテナーの設定に関する詳細は、「TLS サイドカー」 を参照してください。
template
プロパティーを使用すると、ラベル、アノテーション、アフィニティー、容認 (Toleration) などの Entity Operator Pod の詳細を設定できます。
topicOperator
プロパティーには、Topic Operator の設定が含まれます。このオプションがないと、Entity Operator は Topic Operator なしでデプロイされます。
userOperator
プロパティーには、User Operator の設定が含まれます。このオプションがないと、Entity Operator は User Operator なしでデプロイされます。
両方の Operator を有効にする基本設定の例
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... zookeeper: # ... entityOperator: topicOperator: {} userOperator: {}
topicOperator
および userOperator
プロパティーの両方がない場合、Entity Operator はデプロイされません。
3.1.11.2. Topic Operator 設定プロパティー
Topic Operator デプロイメントは、topicOperator
オブジェクト内で追加オプションを使用すると設定できます。以下のプロパティーがサポートされます。
watchedNamespace
-
User Operator によって
KafkaTopics
が監視される OpenShift namespace。デフォルトは、Kafka クラスターがデプロイされた namespace です。 reconciliationIntervalSeconds
-
定期的な調整 (reconciliation) の間隔 (秒単位)。デフォルトは
90
です。 zookeeperSessionTimeoutSeconds
-
ZooKeeper セッションのタイムアウト (秒単位)。デフォルトは
20
です。 topicMetadataMaxAttempts
-
Kafka からトピックメタデータの取得を試行する回数。各試行の間隔は、指数バックオフとして定義されます。パーティションまたはレプリカの数によって、トピックの作成に時間がかかる可能性がある場合は、この値を増やすことを検討してください。デフォルトは
6
です。 image
-
image
プロパティーを使用すると、使用されるコンテナーイメージを設定できます。カスタムコンテナーイメージの設定に関する詳細は、「コンテナーイメージ」を参照してください。 resources
-
resources
プロパティーを使用すると、Topic Operator に割り当てられるリソースの量を設定できます。リソースの要求と制限の設定に関する詳細は、「CPU およびメモリーリソース」を参照してください。 logging
-
logging
プロパティーは、Topic Operator のロギングを設定します。詳細は「Operator ロガー」を参照してください。
Topic Operator 設定の例
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... zookeeper: # ... entityOperator: # ... topicOperator: watchedNamespace: my-topic-namespace reconciliationIntervalSeconds: 60 # ...
3.1.11.3. User Operator 設定プロパティー
User Operator デプロイメントは、userOperator
オブジェクト内で追加オプションを使用すると設定できます。以下のプロパティーがサポートされます。
watchedNamespace
-
User Operator によって
KafkaUsers
が監視される OpenShift namespace。デフォルトは、Kafka クラスターがデプロイされた namespace です。 reconciliationIntervalSeconds
-
定期的な調整 (reconciliation) の間隔 (秒単位)。デフォルトは
120
です。 zookeeperSessionTimeoutSeconds
-
ZooKeeper セッションのタイムアウト (秒単位)。デフォルトは
6
です。 image
-
image
プロパティーを使用すると、使用されるコンテナーイメージを設定できます。カスタムコンテナーイメージの設定に関する詳細は、「コンテナーイメージ」を参照してください。 resources
-
resources
プロパティーを使用すると、User Operator に割り当てられるリソースの量を設定できます。リソースの要求と制限の設定に関する詳細は、「CPU およびメモリーリソース」を参照してください。 logging
-
logging
プロパティーは、User Operator のロギングを設定します。詳細は「Operator ロガー」を参照してください。
User Operator 設定の例
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... zookeeper: # ... entityOperator: # ... userOperator: watchedNamespace: my-user-namespace reconciliationIntervalSeconds: 60 # ...
3.1.11.4. Operator ロガー
Topic Operator および User Operator には設定可能なロガーがあります。
-
rootLogger.level
これらの Operator では Apache log4j2
ロガー実装が使用されます。
logging
リソースの Kafka
プロパティーを使用して、ロガーおよびロガーレベルを設定します。
ログレベルを設定するには、ロガーとレベルを直接指定 (インライン) するか、またはカスタム (外部) ConfigMap を使用します。ConfigMap を使用する場合、logging.name
プロパティーを外部ロギング設定が含まれる ConfigMap の名前に設定します。ConfigMap 内では、ロギング設定は log4j2.properties
を使用して記述されます。
inline
および external
ロギングの例は次のとおりです。
inline ロギング
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... zookeeper: # ... entityOperator: # ... topicOperator: watchedNamespace: my-topic-namespace reconciliationIntervalSeconds: 60 logging: type: inline loggers: rootLogger.level: INFO # ... userOperator: watchedNamespace: my-topic-namespace reconciliationIntervalSeconds: 60 logging: type: inline loggers: rootLogger.level: INFO # ...
外部ロギング
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... zookeeper: # ... entityOperator: # ... topicOperator: watchedNamespace: my-topic-namespace reconciliationIntervalSeconds: 60 logging: type: external name: customConfigMap # ...
その他のリソース
- ガベッジコレクター (GC) ロギングを有効 (または無効) にすることもできます。GC ロギングの詳細は「JVM 設定」を参照してください。
- ログレベルの詳細は、「Apache logging services」を参照してください。
3.1.11.5. Entity Operator の設定
前提条件
- OpenShift クラスターが必要です。
- 稼働中の Cluster Operator が必要です。
手順
Kafka
リソースのentityOperator
プロパティーを編集します。以下に例を示します。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... zookeeper: # ... entityOperator: topicOperator: watchedNamespace: my-topic-namespace reconciliationIntervalSeconds: 60 userOperator: watchedNamespace: my-user-namespace reconciliationIntervalSeconds: 60
リソースを作成または更新します。
oc apply
を使用してこれを行うことができます。oc apply -f your-file
3.1.12. CPU およびメモリーリソース
AMQ Streams では、デプロイされたコンテナーごとに特定のリソースを要求し、これらのリソースの最大消費を定義できます。
AMQ Streams では、以下の 2 つのタイプのリソースがサポートされます。
- CPU
- メモリー
AMQ Streams では、CPU およびメモリーリソースの指定に OpenShift 構文が使用されます。
3.1.12.1. リソースの制限および要求
リソースの制限と要求は、以下のリソースで resources
プロパティーを使用して設定されます。
-
Kafka.spec.kafka
-
Kafka.spec.kafka.tlsSidecar
-
Kafka.spec.zookeeper
-
Kafka.spec.zookeeper.tlsSidecar
-
Kafka.spec.entityOperator.topicOperator
-
Kafka.spec.entityOperator.userOperator
-
Kafka.spec.entityOperator.tlsSidecar
-
Kafka.spec.KafkaExporter
-
KafkaConnect.spec
-
KafkaConnectS2I.spec
-
KafkaBridge.spec
その他のリソース
- OpenShift におけるコンピュートリソースの管理に関する詳細は、「Managing Compute Resources for Containers」を参照してください。
3.1.12.1.1. リソース要求
要求によって、指定のコンテナーに対して予約するリソースが指定されます。リソースを予約すると、リソースが常に利用できるようになります。
リソース要求が OpenShift クラスターで利用可能な空きリソースを超える場合、Pod はスケジュールされません。
リソース要求は requests
プロパティーで指定されます。AMQ Streams では、現在以下のリソース要求がサポートされます。
-
cpu
-
memory
1 つまたは複数のサポートされるリソースに対してリクエストを設定できます。
すべてのリソースを対象とするリソース要求の設定例
# ... resources: requests: cpu: 12 memory: 64Gi # ...
3.1.12.1.2. リソース制限
制限によって、指定のコンテナーが消費可能な最大リソースが指定されます。制限は予約されず、常に利用できるとは限りません。コンテナーは、リソースが利用できる場合のみ、制限以下のリソースを使用できます。リソース制限は、常にリソース要求よりも高くする必要があります。
リソース制限は limits
プロパティーで指定されます。AMQ Streams では、現在以下のリソース制限がサポートされます。
-
cpu
-
memory
1 つまたは複数のサポートされる制限に対してリソースを設定できます。
リソース制限の設定例
# ... resources: limits: cpu: 12 memory: 64Gi # ...
3.1.12.1.3. サポートされる CPU 形式
CPU の要求および制限は以下の形式でサポートされます。
-
整数値 (
5
CPU コア) または少数 (2.5
CPU コア) の CPU コアの数。 -
数値または ミリ CPU / ミリコア (
100m
)。1000 ミリコア は1
CPU コアと同じです。
CPU ユニットの例
# ... resources: requests: cpu: 500m limits: cpu: 2.5 # ...
1 つの CPU コアのコンピューティング能力は、OpenShift がデプロイされたプラットフォームによって異なることがあります。
その他のリソース
- CPU 仕様の詳細は、「Meaning of CPU」を参照してください。
3.1.12.1.4. サポートされるメモリー形式
メモリー要求および制限は、メガバイト、ギガバイト、メビバイト、およびギビバイトで指定されます。
-
メモリーをメガバイトで指定するには、
M
接尾辞を使用します。例:1000M
-
メモリーをギガバイトで指定するには、
G
接尾辞を使用します。例:1G
-
メモリーをメビバイトで指定するには、
Mi
接尾辞を使用します。例:1000Mi
-
メモリーをギビバイトで指定するには、
Gi
接尾辞を使用します。例:1Gi
異なるメモリー単位の使用例
# ... resources: requests: memory: 512Mi limits: memory: 2Gi # ...
その他のリソース
- メモリーの指定およびサポートされるその他の単位に関する詳細は、「Meaning of memory」を参照してください。
3.1.12.2. リソース要求および制限の設定
前提条件
- OpenShift クラスターが必要です。
- 稼働中の Cluster Operator が必要です。
手順
クラスターデプロイメントを指定するリソースの
resources
プロパティーを編集します。以下に例を示します。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: kafka: # ... resources: requests: cpu: "8" memory: 64Gi limits: cpu: "12" memory: 128Gi # ... zookeeper: # ...
リソースを作成または更新します。
oc apply
を使用してこれを行うことができます。oc apply -f your-file
その他のリソース
-
スキーマの詳細は、「
Resources
スキーマ参照」を参照してください。
3.1.13. Kafka ロガー
Kafka には独自の設定可能なロガーがあります。
-
kafka.root.logger.level
-
log4j.logger.org.I0Itec.zkclient.ZkClient
-
log4j.logger.org.apache.zookeeper
-
log4j.logger.kafka
-
log4j.logger.org.apache.kafka
-
log4j.logger.kafka.request.logger
-
log4j.logger.kafka.network.Processor
-
log4j.logger.kafka.server.KafkaApis
-
log4j.logger.kafka.network.RequestChannel$
-
log4j.logger.kafka.controller
-
log4j.logger.kafka.log.LogCleaner
-
log4j.logger.state.change.logger
-
log4j.logger.kafka.authorizer.logger
ZooKeeper にも設定可能なロガーもあります。
-
zookeeper.root.logger
Kafka と ZooKeeper では Apache log4j
ロガー実装が使用されます。
logging
プロパティーを使用してロガーおよびロガーレベルを設定します。
ログレベルを設定するには、ロガーとレベルを直接指定 (インライン) するか、またはカスタム (外部) ConfigMap を使用します。ConfigMap を使用する場合、logging.name
プロパティーを外部ロギング設定が含まれる ConfigMap の名前に設定します。ConfigMap 内では、ロギング設定は log4j.properties
を使用して記述されます。
inline
および external
ロギングの例は次のとおりです。
inline ロギング
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: # ... logging: type: inline loggers: kafka.root.logger.level: "INFO" # ... zookeeper: # ... logging: type: inline loggers: zookeeper.root.logger: "INFO" # ... entityOperator: # ... topicOperator: # ... logging: type: inline loggers: rootLogger.level: INFO # ... userOperator: # ... logging: type: inline loggers: rootLogger.level: INFO # ...
外部ロギング
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: # ... logging: type: external name: customConfigMap # ...
log4j2.properties
を使用して ConfigMap 内にロギング設定を記述するため、Operator によって Apache log4j2
ロガー実装が使用されます。詳細は 「Operator ロガー」 を参照してください。
その他のリソース
- ガベッジコレクター (GC) ロギングを有効 (または無効) にすることもできます。ガべージコレクションの詳細は、「JVM 設定」 を参照してください。
- ログレベルの詳細は、「Apache logging services」を参照してください。
3.1.14. Kafka のラックアウェアネス (Rack awareness)
AMQ Streams のラックアウェアネス (Rack awareness) 機能は、Kafka ブローカー Pod および Kafka トピックレプリカを異なるラック全体に分散できるようにします。ラック認識を有効にすることで、Kafka ブローカーや Kafka ブローカーがホストしているトピックの可用性を向上できるようにします。
「ラック」(Rack) は、可用性ゾーン、データセンター、またはデータセンターの実際のラックを表す可能性があります。
3.1.14.1. Kafka ブローカーでのラック認識 (Rack awareness) の設定
Kafka のラック認識 (Rack awareness) は、Kafka.spec.kafka
の rack
プロパティーで設定できます。rack
オブジェクトには、topologyKey
という名前の必須フィールドが 1 つあります。このキーは、OpenShift クラスターノードに割り当てられたラベルの 1 つと一致する必要があります。このラベルは、Kafka ブローカー Pod をノードにスケジュールする際に OpenShift によって使用されます。OpenShift クラスターがクラウドプロバイダープラットフォームで稼働している場合、そのラベルはノードが稼働している可用性ゾーンを表す必要があります。通常、ノードには、topologyKey
の値として簡単に使用できる failure-domain.beta.kubernetes.io/zone
のラベルが付けられます。これにより、ブローカー Pod がゾーン全体に分散され、Kafka ブローカー内にブローカーの broker.rack
設定パラメーターも設定されます。
前提条件
- OpenShift クラスターが必要です。
- 稼働中の Cluster Operator が必要です。
手順
- ノードがデプロイされたゾーンやラックを表すノードラベルについては、OpenShift 管理者に相談します。
ラベルをトポロジーキーとして使用し、
Kafka
リソースのrack
プロパティーを編集します。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... rack: topologyKey: failure-domain.beta.kubernetes.io/zone # ...
リソースを作成または更新します。
oc apply
を使用してこれを行うことができます。oc apply -f your-file
その他のリソース
- Kafka ラック認識に init コンテナーイメージを設定するための詳細は 「コンテナーイメージ」 を参照してください。
3.1.15. ヘルスチェック
ヘルスチェックは、アプリケーションの健全性を検証する定期的なテストです。ヘルスチェックプローブが失敗すると、OpenShift によってアプリケーションが正常でないと見なされ、その修正が試行されます。
OpenShift では、以下の 2 つのタイプのおよび ヘルスチェックプローブがサポートされます。
- Liveness プローブ
- Readiness プローブ
プローブの詳細は、「Configure Liveness and Readiness Probes」を参照してください。AMQ Streams コンポーネントでは、両タイプのプローブが使用されます。
ユーザーは、Liveness および Readiness プローブに選択されたオプションを設定できます。
3.1.15.1. ヘルスチェックの設定
Liveness および Readiness プローブは、以下のリソースの livenessProbe
および readinessProbe
プロパティーを使用して設定できます。
-
Kafka.spec.kafka
-
Kafka.spec.kafka.tlsSidecar
-
Kafka.spec.zookeeper
-
Kafka.spec.zookeeper.tlsSidecar
-
Kafka.spec.entityOperator.tlsSidecar
-
Kafka.spec.entityOperator.topicOperator
-
Kafka.spec.entityOperator.userOperator
-
Kafka.spec.KafkaExporter
-
KafkaConnect.spec
-
KafkaConnectS2I.spec
-
KafkaMirrorMaker.spec
-
KafkaBridge.spec
livenessProbe
および readinessProbe
の両方によって以下のオプションがサポートされます。
-
initialDelaySeconds
-
timeoutSeconds
-
periodSeconds
-
successThreshold
-
failureThreshold
livenessProbe
および readinessProbe
オプションの詳細については、「Probe
スキーマ参照」 を参照してください。
Liveness および Readiness プローブの設定例
# ... readinessProbe: initialDelaySeconds: 15 timeoutSeconds: 5 livenessProbe: initialDelaySeconds: 15 timeoutSeconds: 5 # ...
3.1.15.2. ヘルスチェックの設定
前提条件
- OpenShift クラスターが必要です。
- 稼働中の Cluster Operator が必要です。
手順
Kafka
、KafkaConnect
、またはKafkaConnectS2I
リソースのlivenessProbe
またはreadinessProbe
プロパティーを編集します。以下に例を示します。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... readinessProbe: initialDelaySeconds: 15 timeoutSeconds: 5 livenessProbe: initialDelaySeconds: 15 timeoutSeconds: 5 # ... zookeeper: # ...
リソースを作成または更新します。
oc apply
を使用してこれを行うことができます。oc apply -f your-file
3.1.16. Prometheus メトリクス
AMQ Streams では、Apache Kafka および ZooKeeper によってサポートされる JMX メトリクスを Prometheus メトリクスに変換するために、Prometheus JMX エクスポーター を使用した Prometheus メトリクスがサポートされます。有効になったメトリクスは、9404 番ポートで公開されます。
Prometheus および Grafana の設定に関する詳細は「メトリクス」を参照してください。
3.1.16.1. メトリクスの設定
Prometheus メトリクスは、以下のリソースに metrics
プロパティーを設定して有効化されます。
-
Kafka.spec.kafka
-
Kafka.spec.zookeeper
-
KafkaConnect.spec
-
KafkaConnectS2I.spec
metrics
プロパティーがリソースに定義されていない場合、Prometheus メトリクスは無効になります。追加設定なしで Prometheus メトリクスのエクスポートを有効にするには、空のオブジェクト ({}
) を設定します。
追加設定なしでメトリクスを有効にする例
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... metrics: {} # ... zookeeper: # ...
metrics
プロパティーには、Prometheus JMX エスクポーター の追加設定が含まれることがあります。
追加の Prometheus JMX Exporter 設定を使用したメトリクスを有効化する例
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... metrics: lowercaseOutputName: true rules: - pattern: "kafka.server<type=(.+), name=(.+)PerSec\\w*><>Count" name: "kafka_server_$1_$2_total" - pattern: "kafka.server<type=(.+), name=(.+)PerSec\\w*, topic=(.+)><>Count" name: "kafka_server_$1_$2_total" labels: topic: "$3" # ... zookeeper: # ...
3.1.16.2. Prometheus メトリクスの設定
前提条件
- OpenShift クラスターが必要です。
- 稼働中の Cluster Operator が必要です。
手順
Kafka
、KafkaConnect
、またはKafkaConnectS2I
リソースのmetrics
プロパティーを編集します。以下に例を示します。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... zookeeper: # ... metrics: lowercaseOutputName: true # ...
リソースを作成または更新します。
oc apply
を使用してこれを行うことができます。oc apply -f your-file
3.1.17. JMX オプション
AMQ Streams では、JMX ポートを 9999 番で開放することで、Kafka ブローカーから JMX メトリクスを取得することがサポートされます。各 Kafka ブローカーに関するさまざまなメトリクスを取得できます。たとえば、BytesPerSecond
の値やブローカーのネットワークの要求レートなどの、使用データを取得できます。AMQ Streams では、パスワードとユーザー名で保護された JMX ポートの開放や、保護されていない JMX ポートの開放がサポートされます。
3.1.17.1. JMX オプションの設定
前提条件
- OpenShift クラスターが必要です。
- 稼働中の Cluster Operator が必要です。
以下のリソースで jmxOptions
プロパティーを使用すると JMX オプションを設定できます。
-
Kafka.spec.kafka
Kafka ブローカーで開放された JMX ポートの、ユーザー名とパスワードの保護を設定できます。
JMX ポートのセキュリティー保護
JMX ポートをセキュアにすると、非承認の Pod によるポートへのアクセスを防ぐことができます。現在、JMX ポートをセキュアにする唯一の方法がユーザー名とパスワードを使用することです。JMX ポートのセキュリティーを有効にするには、authentication
フィールドの type
パラメーターを password
に設定します。
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... jmxOptions: authentication: type: "password" # ... zookeeper: # ...
これにより、ヘッドレスサービスを使用し、対応するブローカーを指定して Pod をクラスター内部にデプロイし、JMX メトリクスを取得することができます。ブローカー 0 から JMX メトリクスを取得するには、指定するヘッドレスサービスの前にブローカー 0 を追加します。
"<cluster-name>-kafka-0-<cluster-name>-<headless-service-name>"
JMX ポートがセキュアである場合、Pod のデプロイメントで JMX シークレットからユーザー名とパスワードを参照すると、そのユーザー名とパスワードを取得できます。
開放された JMX ポートの使用
JMX ポートのセキュリティーを無効にする場合は、authentication
フィールドに何も入力しません。
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... jmxOptions: {} # ... zookeeper: # ...
これにより、ヘッドレスサービスで JMX ポートを開放し、上記と似た方法で Pod をクラスター内にデプロイすることができます。唯一の違いは、すべての Pod が JMX ポートから読み取りできることです。
3.1.18. JVM オプション
AMQ Streams の以下のコンポーネントは、仮想マシン (VM) 内で実行されます。
- Apache Kafka
- Apache ZooKeeper
- Apache Kafka Connect
- Apache Kafka MirrorMaker
- AMQ Streams Kafka Bridge
JVM 設定オプションによって、さまざまなプラットフォームおよびアーキテクチャーのパフォーマンスが最適化されます。AMQ Streams では、これらのオプションの一部を設定できます。
3.1.18.1. JVM 設定
JVM オプションは、以下のリソースの jvmOptions
プロパティーを使用して設定できます。
-
Kafka.spec.kafka
-
Kafka.spec.zookeeper
-
KafkaConnect.spec
-
KafkaConnectS2I.spec
-
KafkaMirrorMaker.spec
-
KafkaBridge.spec
使用可能な JVM オプションの選択されたサブセットのみを設定できます。以下のオプションがサポートされます。
-Xms および -Xmx
-Xms
は、JVM の起動時に最初に割り当てられる最小ヒープサイズを設定します。-Xmx
は、最大ヒープサイズを設定します。
-Xmx
や -Xms
などの JVM 設定で使用できる単位は、対応するイメージの JDK java
バイナリーによって許可される単位です。そのため、1g
または 1G
は 1,073,741,824 バイトを意味し、Gi
は接尾辞として有効な単位ではありません。これは、1G
は 1,000,000,000 バイト、1Gi
は 1,073,741,824 バイトを意味する OpenShift の慣例に準拠している メモリー要求および制限 に使用される単位とは対照的です。
-Xms
および -Xmx
に使用されるデフォルト値は、コンテナーに メモリー要求 の制限が設定されているかどうかによって異なります。
- メモリーの制限がある場合は、JVM の最小および最大メモリーは制限に対応する値に設定されます。
-
メモリーの制限がない場合、JVM の最小メモリーは
128M
に設定され、JVM の最大メモリーは定義されません。これにより、JVM のメモリーを必要に応じて拡張できます。これは、テストおよび開発での単一ノード環境に適しています。
-Xmx
を明示的に設定するには、以下の点に注意する必要があります。
-
JVM のメモリー使用量の合計は、
-Xmx
によって設定された最大ヒープの約 4 倍になります。 -
適切な OpenShift メモリー制限を設定せずに
-Xmx
が設定された場合、OpenShift ノードで、実行されている他の Pod からメモリー不足が発生するとコンテナーが強制終了される可能性があります。 -
適切な OpenShift メモリー要求を設定せずに
-Xmx
が設定された場合、コンテナーはメモリー不足のノードにスケジュールされる可能性があります。この場合、コンテナーは起動せずにクラッシュします (-Xms
が-Xmx
に設定されている場合は即座にクラッシュし、そうでない場合はその後にクラッシュします)。
-Xmx
を明示的に設定する場合は、以下を行うことが推奨されます。
- メモリー要求とメモリー制限を同じ値に設定します。
-
-Xmx
の 4.5 倍以上のメモリー要求を使用します。 -
-Xms
を-Xmx
と同じ値に設定することを検討してください。
大量のディスク I/O を実行するコンテナー (Kafka ブローカーコンテナーなど) は、オペレーティングシステムのページキャッシュとして使用できるメモリーを確保しておく必要があります。このようなコンテナーでは、要求されるメモリーは JVM によって使用されるメモリーよりもはるかに多くなります。
-Xmx
および -Xms
の設定例 (抜粋)
# ... jvmOptions: "-Xmx": "2g" "-Xms": "2g" # ...
上記の例では、JVM のヒープに 2 GiB (2,147,483,648 バイト) が使用されます。メモリー使用量の合計は約 8GiB になります。
最初のヒープサイズ (-Xms
) および最大ヒープサイズ (-Xmx
) に同じ値を設定すると、JVM が必要以上のヒープを割り当てて起動後にメモリーを割り当てないようにすることができます。Kafka および ZooKeeper Pod では、このような割り当てによって不要なレイテンシーが発生する可能性があります。Kafka Connect では、割り当ての過剰を防ぐことが最も重要になります。これは、コンシューマーの数が増えるごとに割り当て過剰の影響がより深刻になる分散モードで特に重要です。
-server
-server
はサーバー JVM を有効にします。このオプションは true または false に設定できます。
-server
の設定例 (抜粋)
# ... jvmOptions: "-server": true # ...
いずれのオプション (-server
および -XX
) も指定されないと、Apache Kafka の KAFKA_JVM_PERFORMANCE_OPTS
のデフォルト設定が使用されます。
-XX
-XX
オブジェクトは、JVM の高度なランタイムオプションの設定に使用できます。-server
および -XX
オプションは、Apache Kafka の KAFKA_JVM_PERFORMANCE_OPTS
オプションの設定に使用されます。
-XX
オブジェクトの使用例
jvmOptions: "-XX": "UseG1GC": true "MaxGCPauseMillis": 20 "InitiatingHeapOccupancyPercent": 35 "ExplicitGCInvokesConcurrent": true "UseParNewGC": false
上記の設定例の場合、JVM オプションは以下のようになります。
-XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:-UseParNewGC
いずれのオプション (-server
および -XX
) も指定されないと、Apache Kafka の KAFKA_JVM_PERFORMANCE_OPTS
のデフォルト設定が使用されます。
3.1.18.1.1. ガベッジコレクターのロギング
jvmOptions
セクションでは、ガベージコレクター (GC) のロギングを有効または無効にすることもできます。GC ロギングはデフォルトで無効になっています。これを有効にするには、以下のように gcLoggingEnabled
プロパティーを設定します。
GC ロギングを有効にする例
# ... jvmOptions: gcLoggingEnabled: true # ...
3.1.18.2. JVM オプションの設定
前提条件
- OpenShift クラスターが必要です。
- 稼働中の Cluster Operator が必要です。
手順
Kafka
、KafkaConnect
、KafkaConnectS2I
、KafkaMirrorMaker
、またはKafkaBridge
リソースのjvmOptions
プロパティーを編集します。以下に例を示します。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... jvmOptions: "-Xmx": "8g" "-Xms": "8g" # ... zookeeper: # ...
リソースを作成または更新します。
oc apply
を使用してこれを行うことができます。oc apply -f your-file
3.1.19. コンテナーイメージ
AMQ Streams では、コンポーネントに使用されるコンテナーイメージを設定できます。コンテナーイメージのオーバーライドは、別のコンテナーレジストリーを使用する必要がある特別な状況でのみ推奨されます。たとえば、AMQ Streams によって使用されるコンテナーリポジトリーにネットワークがアクセスできない場合などがこれに該当します。そのような場合は、AMQ Streams イメージをコピーするか、ソースからビルドする必要があります。設定したイメージが AMQ Streams イメージと互換性のない場合は、適切に機能しない可能性があります。
3.1.19.1. コンテナーイメージの設定
以下のリソースの image
プロパティーを使用すると、各コンポーネントに使用するコンテナーイメージを指定できます。
-
Kafka.spec.kafka
-
Kafka.spec.kafka.tlsSidecar
-
Kafka.spec.zookeeper
-
Kafka.spec.zookeeper.tlsSidecar
-
Kafka.spec.entityOperator.topicOperator
-
Kafka.spec.entityOperator.userOperator
-
Kafka.spec.entityOperator.tlsSidecar
-
KafkaConnect.spec
-
KafkaConnectS2I.spec
-
KafkaBridge.spec
3.1.19.1.1. Kafka、Kafka Connect、および Kafka MirrorMaker の image
プロパティーの設定
Kafka、Kafka Connect (S2I サポートのある Kafka Connect を含む)、および Kafka MirrorMaker では、複数のバージョンの Kafka がサポートされます。各コンポーネントには独自のイメージが必要です。異なる Kafka バージョンのデフォルトイメージは、以下の環境変数で設定されます。
-
STRIMZI_KAFKA_IMAGES
-
STRIMZI_KAFKA_CONNECT_IMAGES
-
STRIMZI_KAFKA_CONNECT_S2I_IMAGES
-
STRIMZI_KAFKA_MIRROR_MAKER_IMAGES
これらの環境変数には、Kafka バージョンと対応するイメージ間のマッピングが含まれます。マッピングは、image
および version
プロパティーとともに使用されます。
-
image
とversion
のどちらもカスタムリソースに指定されていない場合、version
は Cluster Operator のデフォルトの Kafka バージョンに設定され、環境変数のこのバージョンに対応するイメージが指定されます。 -
image
が指定されていてもversion
が指定されていない場合、指定されたイメージが使用され、Cluster Operator のデフォルトの Kafka バージョンがversion
であると想定されます。 -
version
が指定されていてもimage
が指定されていない場合、環境変数の指定されたバージョンに対応するイメージが使用されます。 -
version
とimage
の両方を指定すると、指定されたイメージが使用されます。このイメージには、指定のバージョンの Kafka イメージが含まれると想定されます。
異なるコンポーネントの image
および version
は、以下のプロパティーで設定できます。
-
Kafka の場合は
spec.kafka.image
およびspec.kafka.version
。 -
Kafka Connect、Kafka Connect S2I、および Kafka MirrorMaker の場合は
spec.image
およびspec.version
。
version
のみを提供し、image
プロパティーを未指定のままにしておくことが推奨されます。これにより、カスタムリソースの設定時に間違いが発生する可能性が低減されます。異なるバージョンの Kafka に使用されるイメージを変更する必要がある場合は、Cluster Operator の環境変数を設定することが推奨されます。
3.1.19.1.2. 他のリソースでの image
プロパティーの設定
他のカスタムリソースの image
プロパティーでは、デプロイメント中に指定の値が使用されます。image
プロパティーがない場合、Cluster Operator 設定に指定された image
が使用されます。image
名が Cluster Operator 設定に定義されていない場合、デフォルト値が使用されます。
Kafka ブローカー TLS サイドカーの場合:
-
Cluster Operator 設定から
STRIMZI_DEFAULT_TLS_SIDECAR_KAFKA_IMAGE
環境変数に指定されたコンテナーイメージ。 -
registry.redhat.io/amq7/amq-streams-kafka-24-rhel7:1.4.0
コンテナーイメージ。
-
Cluster Operator 設定から
- ZooKeeper ノードの場合:
ZooKeeper ノードの TLS サイドカーの場合:
-
Cluster Operator 設定から
STRIMZI_DEFAULT_TLS_SIDECAR_ZOOKEEPER_IMAGE
環境変数に指定されたコンテナーイメージ。 -
registry.redhat.io/amq7/amq-streams-kafka-24-rhel7:1.4.0
コンテナーイメージ。
-
Cluster Operator 設定から
Topic Operator の場合:
-
Cluster Operator 設定から
STRIMZI_DEFAULT_TOPIC_OPERATOR_IMAGE
環境変数に指定されたコンテナーイメージ。 -
registry.redhat.io/amq7/amq-streams-rhel7-operator:1.4.0
コンテナーイメージ。
-
Cluster Operator 設定から
User Operator の場合:
-
Cluster Operator 設定から
STRIMZI_DEFAULT_USER_OPERATOR_IMAGE
環境変数に指定されたコンテナーイメージ。 -
registry.redhat.io/amq7/amq-streams-rhel7-operator:1.4.0
コンテナーイメージ。
-
Cluster Operator 設定から
Entity Operator TLS サイドカーの場合:
-
Cluster Operator 設定から
STRIMZI_DEFAULT_TLS_SIDECAR_ENTITY_OPERATOR_IMAGE
環境変数に指定されたコンテナーイメージ。 -
registry.redhat.io/amq7/amq-streams-kafka-24-rhel7:1.4.0
コンテナーイメージ。
-
Cluster Operator 設定から
Kafka Exporter の場合:
-
Cluster Operator 設定から
STRIMZI_DEFAULT_KAFKA_EXPORTER_IMAGE
環境変数に指定されたコンテナーイメージ。 -
registry.redhat.io/amq7/amq-streams-kafka-24-rhel7:1.4.0
コンテナーイメージ。
-
Cluster Operator 設定から
Kafka Bridge の場合:
-
Cluster Operator 設定から
STRIMZI_DEFAULT_KAFKA_BRIDGE_IMAGE
環境変数に指定されたコンテナーイメージ。 -
registry.redhat.io/amq7/amq-streams-bridge-rhel7:1.4.0
コンテナーイメージ。
-
Cluster Operator 設定から
Kafka ブローカーイニシャライザーの場合:
-
Cluster Operator 設定から
STRIMZI_DEFAULT_KAFKA_INIT_IMAGE
環境変数に指定されたコンテナーイメージ。 -
registry.redhat.io/amq7/amq-streams-rhel7-operator:1.4.0
コンテナーイメージ。
-
Cluster Operator 設定から
コンテナーイメージのオーバーライドは、別のコンテナーレジストリーを使用する必要がある特別な状況でのみ推奨されます。たとえば、AMQ Streams によって使用されるコンテナーリポジトリーにネットワークがアクセスできない場合などがこれに該当します。そのような場合は、AMQ Streams イメージをコピーするか、ソースからビルドする必要があります。設定したイメージが AMQ Streams イメージと互換性のない場合は、適切に機能しない可能性があります。
コンテナーイメージ設定の例
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... image: my-org/my-image:latest # ... zookeeper: # ...
3.1.19.2. コンテナーイメージの設定
前提条件
- OpenShift クラスターが必要です。
- 稼働中の Cluster Operator が必要です。
手順
Kafka
、KafkaConnect
、またはKafkaConnectS2I
リソースのimage
プロパティーを編集します。以下に例を示します。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... image: my-org/my-image:latest # ... zookeeper: # ...
リソースを作成または更新します。
oc apply
を使用してこれを行うことができます。oc apply -f your-file
3.1.20. TLS サイドカー
サイドカーは、Pod で実行されるコンテナーですが、サポートの目的で提供されます。AMQ Streams では、TLS サイドカーは TLS を使用して、各種のコンポーネントと ZooKeeper との間のすべての通信を暗号化および復号化します。ZooKeeper にはネイティブの TLS サポートがありません。
TLS サイドカーは以下で使用されます。
- Kafka ブローカー
- ZooKeeper ノード
- Entitiy Operator
3.1.20.1. TLS サイドカー設定
TLS サイドカーは、以下で tlsSidecar
プロパティーを使用して設定できます。
-
Kafka.spec.kafka
-
Kafka.spec.zookeeper
-
Kafka.spec.entityOperator
TLS サイドカーは、以下の追加オプションをサポートします。
-
image
-
resources
-
logLevel
-
readinessProbe
-
livenessProbe
resources
プロパティーを使用すると、TLS サイドカーに割り当てられる メモリーおよび CPU リソース を指定できます。
image
プロパティーを使用すると、使用されるコンテナーイメージを設定できます。カスタムコンテナーイメージの設定に関する詳細は、「コンテナーイメージ」を参照してください。
logLevel
プロパティーは、ログレベルを指定するために使用されます。以下のログレベルがサポートされます。
- emerg
- alert
- crit
- err
- warning
- notice
- info
- debug
デフォルト値は notice です。
readinessProbe
および livenessProbe
プロパティーの設定に関する詳細は 「ヘルスチェックの設定」 を参照してください。
TLS サイドカーの設定例
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... tlsSidecar: image: my-org/my-image:latest resources: requests: cpu: 200m memory: 64Mi limits: cpu: 500m memory: 128Mi logLevel: debug readinessProbe: initialDelaySeconds: 15 timeoutSeconds: 5 livenessProbe: initialDelaySeconds: 15 timeoutSeconds: 5 # ... zookeeper: # ...
3.1.20.2. TLS サイドカーの設定
前提条件
- OpenShift クラスターが必要です。
- 稼働中の Cluster Operator が必要です。
手順
Kafka
リソースのtlsSidecar
プロパティーを編集します。以下に例を示します。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... tlsSidecar: resources: requests: cpu: 200m memory: 64Mi limits: cpu: 500m memory: 128Mi # ... zookeeper: # ...
リソースを作成または更新します。
oc apply
を使用してこれを行うことができます。oc apply -f your-file
3.1.21. Pod スケジューリングの設定
2 つのアプリケーションが同じ OpenShift ノードにスケジュールされた場合、両方のアプリケーションがディスク I/O のように同じリソースを使用し、パフォーマンスに影響する可能性があります。これにより、パフォーマンスが低下する可能性があります。ノードを他の重要なワークロードと共有しないように Kafka Pod をスケジュールする場合、適切なノードを使用したり、Kafka 専用のノードのセットを使用すると、このような問題を適切に回避できます。
3.1.21.1. 他のアプリケーションに基づく Pod のスケジューリング
3.1.21.1.1. 重要なアプリケーションがノードを共有しないようにする
Pod の非アフィニティーを使用すると、重要なアプリケーションが同じディスクにスケジュールされないようにすることができます。Kafka クラスターの実行時に、Pod の非アフィニティーを使用して、Kafka ブローカーがデータベースなどの他のワークロードとノードを共有しないようにすることが推奨されます。
3.1.21.1.2. アフィニティー
アフィニティーは、以下のリソースの affinity
プロパティーを使用して設定できます。
-
Kafka.spec.kafka.template.pod
-
Kafka.spec.zookeeper.template.pod
-
Kafka.spec.entityOperator.template.pod
-
KafkaConnect.spec.template.pod
-
KafkaConnectS2I.spec.template.pod
-
KafkaBridge.spec.template.pod
アフィニティー設定には、さまざまなタイプのアフィニティーを含めることができます。
- Pod のアフィニティーおよび非アフィニティー
- ノードのアフィニティー
affinity
プロパティーの形式は、OpenShift の仕様に準拠します。詳細は、Kubernetes のノードおよび Pod のアフィニティーに関するドキュメント を参照してください。
3.1.21.1.3. Kafka コンポーネントでの Pod の非アフィニティーの設定
前提条件
- OpenShift クラスターが必要です。
- 稼働中の Cluster Operator が必要です。
手順
クラスターデプロイメントを指定するリソースの
affinity
プロパティーを編集します。ラベルを使用して、同じノードでスケジュールすべきでない Pod を指定します。topologyKey
をkubernetes.io/hostname
に設定し、選択した Pod が同じホスト名のノードでスケジュールされてはならないことを指定する必要があります。以下に例を示します。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: kafka: # ... template: pod: affinity: podAntiAffinity: requiredDuringSchedulingIgnoredDuringExecution: - labelSelector: matchExpressions: - key: application operator: In values: - postgresql - mongodb topologyKey: "kubernetes.io/hostname" # ... zookeeper: # ...
リソースを作成または更新します。
oc apply
を使用してこれを行うことができます。oc apply -f your-file
3.1.21.2. 特定のノードへの Pod のスケジューリング
3.1.21.2.1. ノードのスケジューリング
OpenShift クラスターは、通常多くの異なるタイプのワーカーノードで構成されます。ワークロードが非常に大きい環境の CPU に対して最適化されたものもあれば、メモリー、ストレージ (高速のローカル SSD)、または ネットワークに対して最適化されたものもあります。異なるノードを使用すると、コストとパフォーマンスの両面で最適化しやすくなります。最適なパフォーマンスを実現するには、AMQ Streams コンポーネントのスケジューリングで適切なノードを使用できるようにすることが重要です。
OpenShift は、ノードのアフィニティーを使用してワークロードを特定のノードにスケジュールします。ノードのアフィニティーにより、Pod がスケジュールされるノードにスケジューリングの制約を作成できます。制約はラベルセレクターとして指定されます。beta.kubernetes.io/instance-type
などの組み込みノードラベルまたはカスタムラベルのいずれかを使用してラベルを指定すると、適切なノードを選択できます。
3.1.21.2.2. アフィニティー
アフィニティーは、以下のリソースの affinity
プロパティーを使用して設定できます。
-
Kafka.spec.kafka.template.pod
-
Kafka.spec.zookeeper.template.pod
-
Kafka.spec.entityOperator.template.pod
-
KafkaConnect.spec.template.pod
-
KafkaConnectS2I.spec.template.pod
-
KafkaBridge.spec.template.pod
アフィニティー設定には、さまざまなタイプのアフィニティーを含めることができます。
- Pod のアフィニティーおよび非アフィニティー
- ノードのアフィニティー
affinity
プロパティーの形式は、OpenShift の仕様に準拠します。詳細は、Kubernetes のノードおよび Pod のアフィニティーに関するドキュメント を参照してください。
3.1.21.2.3. Kafka コンポーネントでのノードのアフィニティーの設定
前提条件
- OpenShift クラスターが必要です。
- 稼働中の Cluster Operator が必要です。
手順
AMQ Streams コンポーネントをスケジュールする必要のあるノードにラベルを付けます。
oc label
を使用してこれを行うことができます。oc label node your-node node-type=fast-network
または、既存のラベルによっては再利用が可能です。
クラスターデプロイメントを指定するリソースの
affinity
プロパティーを編集します。以下に例を示します。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: kafka: # ... template: pod: affinity: nodeAffinity: requiredDuringSchedulingIgnoredDuringExecution: nodeSelectorTerms: - matchExpressions: - key: node-type operator: In values: - fast-network # ... zookeeper: # ...
リソースを作成または更新します。
oc apply
を使用してこれを行うことができます。oc apply -f your-file
3.1.21.3. 専用ノードの使用
3.1.21.3.1. 専用ノード
クラスター管理者は、選択した OpenShift ノードをテイントとしてマーク付けできます。テイントのあるノードは、通常のスケジューリングから除外され、通常の Pod はそれらのノードでの実行はスケジュールされません。ノードに設定されたテイントを許容できるサービスのみをスケジュールできます。このようなノードで実行されるその他のサービスは、ログコレクターやソフトウェア定義のネットワークなどのシステムサービスのみです。
テイントは専用ノードの作成に使用できます。専用のノードで Kafka とそのコンポーネントを実行する利点は多くあります。障害の原因になったり、Kafka に必要なリソースを消費するその他のアプリケーションが同じノードで実行されません。これにより、パフォーマンスと安定性が向上します。
専用ノードで Kafka Pod をスケジュールするには、ノードのアフィニティー と 許容 (toleration) を設定します。
3.1.21.3.2. アフィニティー
アフィニティーは、以下のリソースの affinity
プロパティーを使用して設定できます。
-
Kafka.spec.kafka.template.pod
-
Kafka.spec.zookeeper.template.pod
-
Kafka.spec.entityOperator.template.pod
-
KafkaConnect.spec.template.pod
-
KafkaConnectS2I.spec.template.pod
-
KafkaBridge.spec.template.pod
アフィニティー設定には、さまざまなタイプのアフィニティーを含めることができます。
- Pod のアフィニティーおよび非アフィニティー
- ノードのアフィニティー
affinity
プロパティーの形式は、OpenShift の仕様に準拠します。詳細は、Kubernetes のノードおよび Pod のアフィニティーに関するドキュメント を参照してください。
3.1.21.3.3. 許容 (Toleration)
許容 (Toleration) は、以下のリソースの tolerations
プロパティーを使用して設定できます。
-
Kafka.spec.kafka.template.pod
-
Kafka.spec.zookeeper.template.pod
-
Kafka.spec.entityOperator.template.pod
-
KafkaConnect.spec.template.pod
-
KafkaConnectS2I.spec.template.pod
-
KafkaBridge.spec.template.pod
tolerations
プロパティーの形式は、OpenShift の仕様に準拠します。詳細は、Kubernetes の「Taints and Tolerations」を参照してください。
3.1.21.3.4. 専用ノードの設定と Pod のスケジューリング
前提条件
- OpenShift クラスターが必要です。
- 稼働中の Cluster Operator が必要です。
手順
- 専用ノードとして使用するノードを選択します。
- これらのノードにスケジュールされているワークロードがないことを確認します。
選択したノードにテイントを設定します。
oc adm taint
を使用してこれを行うことができます。oc adm taint node your-node dedicated=Kafka:NoSchedule
さらに、選択したノードにラベルも追加します。
oc label
を使用してこれを行うことができます。oc label node your-node dedicated=Kafka
クラスターデプロイメントを指定するリソースの
affinity
およびtolerations
プロパティーを編集します。以下に例を示します。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: kafka: # ... template: pod: tolerations: - key: "dedicated" operator: "Equal" value: "Kafka" effect: "NoSchedule" affinity: nodeAffinity: requiredDuringSchedulingIgnoredDuringExecution: nodeSelectorTerms: - matchExpressions: - key: dedicated operator: In values: - Kafka # ... zookeeper: # ...
リソースを作成または更新します。
oc apply
を使用してこれを行うことができます。oc apply -f your-file
3.1.22. Kafka Exporter
Kafka
リソースを設定すると、クラスターに Kafka Exporter を自動的にデプロイできます。
Kafka Exporter は、主にオフセット、コンシューマーグループ、コンシューマーラグ、およびトピックに関連するデータである分析用のデータを Prometheus メトリクスとして抽出します。
Kafka Exporter の詳細と、パフォーマンスのためにコンシューマーラグを監視する重要性の理由については、「Kafka Exporter」を参照してください。
3.1.22.1. Kafka Exporter の設定
KafkaExporter
プロパティーを使用して、Kafka
リソースの Kafka Exporter を設定します。
Kafka
リソースとそのプロパティーの概要は、「Kafka YAML の設定例」を参照してください。
この手順では、Kafka Exporter 設定に関連するプロパティーを取り上げます。
これらのプロパティーは、Kafka クラスターのデプロイメントまたは再デプロイメントの一部として設定できます。
前提条件
- OpenShift クラスターが必要です。
- 稼働中の Cluster Operator が必要です。
手順
Kafka
リソースのKafkaExporter
プロパティーを編集します。設定可能なプロパティーは以下の例のとおりです。
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: # ... kafkaExporter: image: my-org/my-image:latest 1 groupRegex: ".*" 2 topicRegex: ".*" 3 resources: 4 requests: cpu: 200m memory: 64Mi limits: cpu: 500m memory: 128Mi logging: debug 5 enableSaramaLogging: true 6 template: 7 pod: metadata: labels: label1: value1 imagePullSecrets: - name: my-docker-credentials securityContext: runAsUser: 1000001 fsGroup: 0 terminationGracePeriodSeconds: 120 readinessProbe: 8 initialDelaySeconds: 15 timeoutSeconds: 5 livenessProbe: 9 initialDelaySeconds: 15 timeoutSeconds: 5 # ...
リソースを作成または更新します。
oc apply -f kafka.yaml
次のステップ
Kafka Exporter の設定およびデプロイ後に、Grafana を有効にして Kafka Exporter ダッシュボードを表示できます。
その他のリソース
3.1.23. Kafka クラスターのローリングアップデートの実行
この手順では、OpenShift アノテーションを使用して、既存の Kafka クラスターのローリングアップデートを手動でトリガーする方法を説明します。
前提条件
- 稼働中の Kafka クラスター。
- 稼働中の Cluster Operator。
手順
手動で更新する Kafka Pod を制御する
StatefulSet
の名前を見つけます。たとえば、Kafka クラスターの名前が my-cluster の場合、対応する
StatefulSet
の名前は my-cluster-kafka になります。OpenShift で
StatefulSet
リソースにアノテーションを付けます。以下はoc annotate
を使用した例になります。oc annotate statefulset cluster-name-kafka strimzi.io/manual-rolling-update=true
-
次の調整が発生するまで待ちます (デフォルトでは 2 分ごとです)。アノテーションが調整プロセスで検出されれば、アノテーションが付いた
StatefulSet
内のすべての Pod でローリングアップデートがトリガーされます。すべての Pod のローリングアップデートが完了すると、アノテーションはStatefulSet
から削除されます。
その他のリソース
- Cluster Operator のデプロイメントに関する詳細は、「Cluster Operator」 を参照してください。
- Kafka クラスターのデプロイメントに関する詳細は、「Kafka クラスターのデプロイメント」 を参照してください。
3.1.24. ZooKeeper クラスターのローリングアップデートの実行
この手順では、OpenShift アノテーションを使用して、既存の ZooKeeper クラスターのローリングアップデートを手動でトリガーする方法を説明します。
前提条件
- 稼働中の ZooKeeper クラスター。
- 稼働中の Cluster Operator。
手順
手動で更新する ZooKeeper Pod を制御する
StatefulSet
の名前を見つけます。たとえば、Kafka クラスターの名前が my-cluster の場合、対応する
StatefulSet
の名前は my-cluster-zookeeper になります。OpenShift で
StatefulSet
リソースにアノテーションを付けます。以下はoc annotate
を使用した例になります。oc annotate statefulset cluster-name-zookeeper strimzi.io/manual-rolling-update=true
-
次の調整が発生するまで待ちます (デフォルトでは 2 分ごとです)。アノテーションが調整プロセスで検出されれば、アノテーションが付いた
StatefulSet
内のすべての Pod でローリングアップデートがトリガーされます。すべての Pod のローリングアップデートが完了すると、アノテーションはStatefulSet
から削除されます。
その他のリソース
- Cluster Operator のデプロイメントに関する詳細は、「Cluster Operator」 を参照してください。
- ZooKeeper クラスターのデプロイメントに関する詳細は、「Kafka クラスターのデプロイメント」 を参照してください。
3.1.25. クラスターのスケーリング
3.1.25.1. Kafka クラスターのスケーリング
3.1.25.1.1. ブローカーのクラスターへの追加
トピックのスループットを向上させる主な方法は、そのトピックのパーティション数を増やすことです。これにより、追加のパーティションによってクラスター内の異なるブローカー間でトピックの負荷が共有されます。ただし、各ブローカーが特定のリソース (通常は I/O) によって制約される場合、パーティションを増やしてもスループットは向上しません。代わりに、ブローカーをクラスターに追加する必要があります。
追加のブローカーをクラスターに追加する場合、Kafka ではパーティションは自動的に割り当てられません。既存のブローカーから新規のブローカーに移動するパーティションを決定する必要があります。
すべてのブローカー間でパーティションが再分散されたら、各ブローカーのリソース使用率が低下するはずです。
3.1.25.1.2. クラスターからのブローカーの削除
AMQ Streams では StatefulSets
を使用してブローカー Pod を管理されるため、あらゆる Pod を削除できるわけではありません。クラスターから削除できるのは、番号が最も大きい 1 つまたは複数の Pod のみです。たとえば、12 個のブローカーがあるクラスターでは、Pod の名前は cluster-name-kafka-0
から cluster-name-kafka-11
になります。1 つのブローカー分をスケールダウンする場合、cluster-name-kafka-11
が削除されます。
クラスターからブローカーを削除する前に、そのブローカーにパーティションが割り当てられていないことを確認します。また、使用が停止されたブローカーの各パーティションを引き継ぐ、残りのブローカーを決める必要もあります。ブローカーに割り当てられたパーティションがなければ、クラスターを安全にスケールダウンできます。
3.1.25.2. パーティションの再割り当て
現在、Topic Operator はレプリカを別のブローカーに再割当てすることをサポートしないため、ブローカー Pod に直接接続してレプリカをブローカーに再割り当てする必要があります。
ブローカー Pod 内では、kafka-reassign-partitions.sh
ユーティリティーを使用してパーティションを別のブローカーに再割り当てできます。
これには、以下の 3 つのモードがあります。
--generate
- トピックとブローカーのセットを取り、再割り当て JSON ファイル を生成します。これにより、トピックのパーティションがブローカーに割り当てられます。これはトピック全体で動作するため、一部のトピックのパーティションを再割り当てする必要がある場合は使用できません。
--execute
- 再割り当て JSON ファイル を取り、クラスターのパーティションおよびブローカーに適用します。その結果、パーティションを取得したブローカーは、パーティションリーダーのフォロワーになります。新規ブローカーが ISR (In-Sync Replica、同期レプリカ) に参加できたら、古いブローカーはフォロワーではなくなり、そのレプリカが削除されます。
--verify
-
--verify
は、--execute
ステップと同じ 再割り当て JSON ファイル を使用して、ファイル内のすべてのパーティションが目的のブローカーに移動されたかどうかを確認します。再割り当てが完了すると、--verify は有効な スロットル も削除します。スロットルを削除しないと、再割り当てが完了した後もクラスターは影響を受け続けます。
クラスターでは、1 度に 1 つの再割当てのみを実行でき、実行中の再割当てをキャンセルすることはできません。再割り当てをキャンセルする必要がある場合は、割り当てが完了するのを待ってから別の再割り当てを実行し、最初の再割り当ての結果を元に戻します。kafka-reassign-partitions.sh
によって、元に戻すための再割り当て JSON が出力の一部として生成されます。大規模な再割り当ては、進行中の再割り当てを停止する必要がある場合に備えて、複数の小さな再割り当てに分割するようにしてください。
3.1.25.2.1. 再割り当て JSON ファイル
再割り当て JSON ファイル には特定の構造があります。
{
"version": 1,
"partitions": [
<PartitionObjects>
]
}
ここで <PartitionObjects> は、以下のようなコンマ区切りのオブジェクトリストになります。
{ "topic": <TopicName>, "partition": <Partition>, "replicas": [ <AssignedBrokerIds> ] }
Kafka は "log_dirs"
プロパティーもサポートしますが、Red Hat AMQ Streams では使用しないでください。
以下は、トピック topic-a
およびパーティション 4
をブローカー 2
、4
、および 7
に割り当て、トピック topic-b
およびパーティション 2
をブローカー 1
、5
、および 7
に割り当てる、再割り当て JSON ファイルの例になります。
{ "version": 1, "partitions": [ { "topic": "topic-a", "partition": 4, "replicas": [2,4,7] }, { "topic": "topic-b", "partition": 2, "replicas": [1,5,7] } ] }
JSON に含まれていないパーティションは変更されません。
3.1.25.2.2. JBOD ボリューム間でのパーティションの再割り当て
Kafka クラスターで JBOD ストレージを使用する場合は、特定のボリュームとログディレクトリー (各ボリュームに単一のログディレクトリーがある) との間でパーティションを再割り当てを選択することができます。パーティションを特定のボリュームに再割り当てするには、再割り当て JSON ファイルで log_dirs
オプションを <PartitionObjects> に追加します。
{ "topic": <TopicName>, "partition": <Partition>, "replicas": [ <AssignedBrokerIds> ], "log_dirs": [ <AssignedLogDirs> ] }
log_dirs
オブジェクトに含まれるログディレクトリーの数は、replicas
オブジェクトで指定されるレプリカ数と同じである必要があります。値は、ログディレクトリーへの絶対パスか、any
キーワードである必要があります。
以下に例を示します。
{ "topic": "topic-a", "partition": 4, "replicas": [2,4,7]. "log_dirs": [ "/var/lib/kafka/data-0/kafka-log2", "/var/lib/kafka/data-0/kafka-log4", "/var/lib/kafka/data-0/kafka-log7" ] }
3.1.25.3. 再割り当て JSON ファイルの生成
この手順では、kafka-reassign-partitions.sh
ツールを使用して、指定のトピックセットすべてのパーティションを再割り当てする再割り当て JSON ファイルを生成する方法を説明します。
前提条件
- 稼働中の Cluster Operator が必要です。
-
Kafka
リソース - パーティションを再割り当てするトピックセット。
手順
移動するトピックを一覧表示する
topics.json
という名前の JSON ファイルを準備します。これには、以下の構造が必要です。{ "version": 1, "topics": [ <TopicObjects> ] }
ここで <TopicObjects> は、以下のようなコンマ区切りのオブジェクトリストになります。
{ "topic": <TopicName> }
たとえば、
topic-a
とtopic-b
のすべてのパーティションを再割り当てするには、以下のようなtopics.json
ファイルを準備する必要があります。{ "version": 1, "topics": [ { "topic": "topic-a"}, { "topic": "topic-b"} ] }
topics.json
ファイルをブローカー Pod の 1 つにコピーします。cat topics.json | oc exec -c kafka <BrokerPod> -i -- \ /bin/bash -c \ 'cat > /tmp/topics.json'
kafka-reassign-partitions.sh
コマンドを使用して、再割り当て JSON を生成します。oc exec <BrokerPod> -c kafka -it -- \ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 \ --topics-to-move-json-file /tmp/topics.json \ --broker-list <BrokerList> \ --generate
たとえば、
topic-a
およびtopic-b
のすべてのパーティションをブローカー4
および7
に移動する場合は、以下を実行します。oc exec <BrokerPod> -c kafka -it -- \ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 \ --topics-to-move-json-file /tmp/topics.json \ --broker-list 4,7 \ --generate
3.1.25.4. 手動による再割り当て JSON ファイルの作成
特定のパーティションを移動したい場合は、再割り当て JSON ファイルを手動で作成できます。
3.1.25.5. 再割り当てスロットル
パーティションの再割り当てには、ブローカーの間で大量のデータを転送する必要があるため、処理が遅くなる可能性があります。クライアントへの悪影響を防ぐため、再割り当て処理をスロットルで調整することができます。これにより、再割り当ての完了に時間がかかる可能性があります。
- スロットルが低すぎると、新たに割り当てられたブローカーは公開されるレコードに遅れずに対応することはできず、再割り当ては永久に完了しません。
- スロットルが高すぎると、クライアントに影響します。
たとえば、プロデューサーの場合は、承認待ちが通常のレイテンシーよりも大きくなる可能性があります。コンシューマーの場合は、ポーリング間のレイテンシーが大きいことが原因でスループットが低下する可能性があります。
3.1.25.6. Kafka クラスターのスケールアップ
この手順では、Kafka クラスターでブローカーの数を増やす方法を説明します。
前提条件
- 既存の Kafka クラスター。
-
拡大されたクラスターでパーティションをブローカーに再割り当てする方法が記述される
reassignment.json
というファイル名の 再割り当て JSON ファイル。
手順
-
Kafka.spec.kafka.replicas
設定オプションを増やして、新しいブローカーを必要なだけ追加します。 - 新しいブローカー Pod が起動したことを確認します。
後でコマンドを実行するブローカー Pod に
reassignment.json
ファイルをコピーします。cat reassignment.json | \ oc exec broker-pod -c kafka -i -- /bin/bash -c \ 'cat > /tmp/reassignment.json'
以下に例を示します。
cat reassignment.json | \ oc exec my-cluster-kafka-0 -c kafka -i -- /bin/bash -c \ 'cat > /tmp/reassignment.json'
同じブローカー Pod から
kafka-reassign-partitions.sh
コマンドラインツールを使用して、パーティションの再割り当てを実行します。oc exec broker-pod -c kafka -it -- \ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 \ --reassignment-json-file /tmp/reassignment.json \ --execute
レプリケーションをスロットルで調整する場合、
--throttle
とブローカー間のスロットル率 (バイト/秒単位) を渡すこともできます。以下に例を示します。oc exec my-cluster-kafka-0 -c kafka -it -- \ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 \ --reassignment-json-file /tmp/reassignment.json \ --throttle 5000000 \ --execute
このコマンドは、2 つの再割り当て JSON オブジェクトを出力します。最初の JSON オブジェクトには、移動されたパーティションの現在の割り当てが記録されます。後で再割り当てを元に戻す必要がある場合に備え、この値をローカルファイル (Pod のファイル以外) に保存します。2 つ目の JSON オブジェクトは、再割り当て JSON ファイルに渡した目的の再割り当てです。
再割り当ての最中にスロットルを変更する必要がある場合は、同じコマンドラインに別のスロットル率を指定して実行します。以下に例を示します。
oc exec my-cluster-kafka-0 -c kafka -it -- \ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 \ --reassignment-json-file /tmp/reassignment.json \ --throttle 10000000 \ --execute
ブローカー Pod のいずれかから
kafka-reassign-partitions.sh
コマンドラインツールを使用して、再割り当てが完了したかどうかを定期的に確認します。これは先ほどの手順と同じコマンドですが、--execute
オプションの代わりに--verify
オプションを使用します。oc exec broker-pod -c kafka -it -- \ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 \ --reassignment-json-file /tmp/reassignment.json \ --verify
例:
oc exec my-cluster-kafka-0 -c kafka -it -- \ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 \ --reassignment-json-file /tmp/reassignment.json \ --verify
-
--verify
コマンドによって、移動した各パーティションが正常に完了したことが報告されると、再割り当ては終了します。この最終的な--verify
によって、結果的に再割り当てスロットルも削除されます。割り当てを元のブローカーに戻すために JSON ファイルを保存した場合は、ここでそのファイルを削除できます。
3.1.25.7. Kafka クラスターのスケールダウン
その他のリソース
この手順では、Kafka クラスターでブローカーの数を減らす方法を説明します。
前提条件
- 既存の Kafka クラスター。
-
最も番号の大きい
Pod(s)
のブローカーが削除された後にクラスターのブローカーにパーティションを再割り当てする方法が記述されている、reassignment.json
という名前の 再割り当て JSON ファイル。
手順
後でコマンドを実行するブローカー Pod に
reassignment.json
ファイルをコピーします。cat reassignment.json | \ oc exec broker-pod -c kafka -i -- /bin/bash -c \ 'cat > /tmp/reassignment.json'
以下に例を示します。
cat reassignment.json | \ oc exec my-cluster-kafka-0 -c kafka -i -- /bin/bash -c \ 'cat > /tmp/reassignment.json'
同じブローカー Pod から
kafka-reassign-partitions.sh
コマンドラインツールを使用して、パーティションの再割り当てを実行します。oc exec broker-pod -c kafka -it -- \ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 \ --reassignment-json-file /tmp/reassignment.json \ --execute
レプリケーションをスロットルで調整する場合、
--throttle
とブローカー間のスロットル率 (バイト/秒単位) を渡すこともできます。以下に例を示します。oc exec my-cluster-kafka-0 -c kafka -it -- \ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 \ --reassignment-json-file /tmp/reassignment.json \ --throttle 5000000 \ --execute
このコマンドは、2 つの再割り当て JSON オブジェクトを出力します。最初の JSON オブジェクトには、移動されたパーティションの現在の割り当てが記録されます。後で再割り当てを元に戻す必要がある場合に備え、この値をローカルファイル (Pod のファイル以外) に保存します。2 つ目の JSON オブジェクトは、再割り当て JSON ファイルに渡した目的の再割り当てです。
再割り当ての最中にスロットルを変更する必要がある場合は、同じコマンドラインに別のスロットル率を指定して実行します。以下に例を示します。
oc exec my-cluster-kafka-0 -c kafka -it -- \ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 \ --reassignment-json-file /tmp/reassignment.json \ --throttle 10000000 \ --execute
ブローカー Pod のいずれかから
kafka-reassign-partitions.sh
コマンドラインツールを使用して、再割り当てが完了したかどうかを定期的に確認します。これは先ほどの手順と同じコマンドですが、--execute
オプションの代わりに--verify
オプションを使用します。oc exec broker-pod -c kafka -it -- \ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 \ --reassignment-json-file /tmp/reassignment.json \ --verify
例:
oc exec my-cluster-kafka-0 -c kafka -it -- \ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 \ --reassignment-json-file /tmp/reassignment.json \ --verify
-
--verify
コマンドによって、移動した各パーティションが正常に完了したことが報告されると、再割り当ては終了します。この最終的な--verify
によって、結果的に再割り当てスロットルも削除されます。割り当てを元のブローカーに戻すために JSON ファイルを保存した場合は、ここでそのファイルを削除できます。 すべてのパーティションの再割り当てが終了すると、削除されるブローカーはクラスター内のいずれのパーティションにも対応しないはずです。これは、ブローカーのデータログディレクトリーにライブパーティションのログが含まれていないことを確認すると検証できます。ブローカーのログディレクトリーに、拡張正規表現
\.[a-z0-9]-delete$
と一致しないディレクトリーが含まれる場合、ブローカーにライブパーティションがあるため、停止してはなりません。これを確認するには、以下のコマンドを実行します。
oc exec my-cluster-kafka-0 -c kafka -it -- \ /bin/bash -c \ "ls -l /var/lib/kafka/kafka-log_<N>_ | grep -E '^d' | grep -vE '[a-zA-Z0-9.-]+\.[a-z0-9]+-delete$'"
N は削除された
Pod(s)
の数に置き換えます。上記のコマンドによって出力が生成される場合、ブローカーにはライブパーティションがあります。この場合、再割り当てが終了していないか、再割り当て JSON ファイルが適切ではありません。
-
ブローカーにライブパーティションがないことが確認できたら、
Kafka
リソースのKafka.spec.kafka.replicas
を編集できます。これにより、StatefulSet
がスケールダウンされ、番号が最も大きいブローカーPod(s)
が削除されます。
3.1.26. Kafka ノードの手動による削除
その他のリソース
この手順では、OpenShift アノテーションを使用して既存の Kafka ノードを削除する方法を説明します。Kafka ノードの削除するには、Kafka ブローカーが稼働している Pod
と、関連する PersistentVolumeClaim
の両方を削除します (クラスターが永続ストレージでデプロイされた場合)。削除後、Pod
と関連する PersistentVolumeClaim
は自動的に再作成されます。
PersistentVolumeClaim
を削除すると、データが永久に失われる可能性があります。以下の手順は、ストレージで問題が発生した場合にのみ実行してください。
前提条件
- 稼働中の Kafka クラスター。
- 稼働中の Cluster Operator。
手順
削除する
Pod
の名前を見つけます。たとえば、クラスターの名前が cluster-name の場合、Pod の名前は cluster-name-kafka-index になります。index はゼロで始まり、レプリカーの合計数で終わる値です。
OpenShift で
Pod
リソースにアノテーションを付けます。oc annotate
を使用します。oc annotate pod cluster-name-kafka-index strimzi.io/delete-pod-and-pvc=true
- 基盤となる永続ボリューム要求 (Persistent Volume Claim) でアノテーションが付けられた Pod が削除され、再作成されるときに次の調整の実行を待ちます。
その他のリソース
- Cluster Operator のデプロイメントに関する詳細は、「Cluster Operator」 を参照してください。
- Kafka クラスターのデプロイメントに関する詳細は、「Kafka クラスターのデプロイメント」 を参照してください。
3.1.27. ZooKeeper ノードの手動による削除
この手順では、OpenShift アノテーションを使用して既存の ZooKeeper ノードを削除する方法を説明します。ZooKeeper ノードの削除するには、ZooKeeper が稼働している Pod
と、関連する PersistentVolumeClaim
の両方を削除します (クラスターが永続ストレージでデプロイされた場合)。削除後、Pod
と関連する PersistentVolumeClaim
は自動的に再作成されます。
PersistentVolumeClaim
を削除すると、データが永久に失われる可能性があります。以下の手順は、ストレージで問題が発生した場合にのみ実行してください。
前提条件
- 稼働中の ZooKeeper クラスター。
- 稼働中の Cluster Operator。
手順
削除する
Pod
の名前を見つけます。たとえば、クラスターの名前が cluster-name の場合、Pod の名前は cluster-name-zookeeper-index になります。index はゼロで始まり、レプリカーの合計数で終わる値です。
OpenShift で
Pod
リソースにアノテーションを付けます。oc annotate
を使用します。oc annotate pod cluster-name-zookeeper-index strimzi.io/delete-pod-and-pvc=true
- 基盤となる永続ボリューム要求 (Persistent Volume Claim) でアノテーションが付けられた Pod が削除され、再作成されるときに次の調整の実行を待ちます。
その他のリソース
- Cluster Operator のデプロイメントに関する詳細は、「Cluster Operator」 を参照してください。
- ZooKeeper クラスターのデプロイメントに関する詳細は、「Kafka クラスターのデプロイメント」 を参照してください。
3.1.28. ローリングアップデートのメンテナンス時間枠
メンテナンス時間枠によって、Kafka および ZooKeeper クラスターの特定のローリングアップデートが便利な時間に開始されるようにスケジュールできます。
3.1.28.1. メンテナンス時間枠の概要
ほとんどの場合、Cluster Operator は対応する Kafka
リソースの変更に対応するために Kafka または ZooKeeper クラスターのみを更新します。これにより、Kafka
リソースの変更を適用するタイミングを計画し、Kafka クライアントアプリケーションへの影響を最小限に抑えることができます。
ただし、Kafka
リソースの変更がなくても Kafka および ZooKeeper クラスターの更新が発生することがあります。たとえば、Cluster Operator によって管理される CA (認証局) 証明書が期限切れ直前である場合にローリング再起動の実行が必要になります。
サービスの 可用性 は Pod のローリング再起動による影響を受けないはずですが (ブローカーおよびトピックの設定が適切である場合)、Kafka クライアントアプリケーションの パフォーマンス は影響を受ける可能性があります。メンテナンス時間枠によって、Kafka および ZooKeeper クラスターのこのような自発的なアップデートが便利な時間に開始されるようにスケジュールできます。メンテナンス時間枠がクラスターに設定されていない場合は、予測できない高負荷が発生する期間など、不便な時間にこのような自発的なローリングアップデートが行われる可能性があります。
3.1.28.2. メンテナンス時間枠の定義
Kafka.spec.maintenanceTimeWindows
プロパティーに文字列の配列を入力して、メンテナンス時間枠を設定します。各文字列は、UTC (協定世界時、Coordinated Universal Time) であると解釈される cron 式 です。UTC は実用的にはグリニッジ標準時と同じです。
以下の例では、日、月、火、水、および木曜日の午前 0 時に開始し、午前 1 時 59 分 (UTC) に終わる、単一のメンテナンス時間枠が設定されます。
# ... maintenanceTimeWindows: - "* * 0-1 ? * SUN,MON,TUE,WED,THU *" # ...
実際には、必要な CA 証明書の更新が設定されたメンテナンス時間枠内で完了できるように、Kafka
リソースの Kafka.spec.clusterCa.renewalDays
および Kafka.spec.clientsCa.renewalDays
プロパティーとともにメンテナンス期間を設定する必要があります。
AMQ Streams では、指定の期間にしたがってメンテナンス操作を正確にスケジュールしません。その代わりに、調整ごとにメンテナンス期間が現在「オープン」であるかどうかを確認します。これは、特定の時間枠内でのメンテナンス操作の開始が、最大で Cluster Operator の調整が行われる間隔の長さ分、遅れる可能性があることを意味します。したがって、メンテナンス時間枠は最低でもその間隔の長さにする必要があります。
その他のリソース
- Cluster Operator 設定についての詳細は、「Cluster Operator の設定」 を参照してください。
3.1.28.3. メンテナンス時間枠の設定
サポートされるプロセスによってトリガーされるローリングアップデートのメンテナンス時間枠を設定できます。
前提条件
- OpenShift クラスターが必要です。
- Cluster Operator が稼働している必要があります。
手順
Kafka
リソースでmaintenanceTimeWindows
プロパティーを追加または編集します。たとえば、0800 から 1059 までと、1400 から 1559 までのメンテナンスを可能にするには、以下のようにmaintenanceTimeWindows
を設定します。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... zookeeper: # ... maintenanceTimeWindows: - "* * 8-10 * * ?" - "* * 14-15 * * ?"
リソースを作成または更新します。
oc apply
を使用してこれを行うことができます。oc apply -f your-file
その他のリソース
- Kafka クラスターのローリングアップデートの実行については、「Kafka クラスターのローリングアップデートの実行」 を参照してください。
- ZooKeeper クラスターのローリングアップデートの実行については、「ZooKeeper クラスターのローリングアップデートの実行」 を参考してください。
3.1.29. CA 証明書の手動更新
Kafka.spec.clusterCa.generateCertificateAuthority
および Kafka.spec.clientsCa.generateCertificateAuthority
オブジェクトが false
に設定されていない限り、クラスターおよびクライアント CA 証明書は、それぞれの証明書の更新期間の開始時に自動で更新されます。セキュリティー上の理由で必要であれば、証明書の更新期間が始まる前に、これらの証明書のいずれかまたは両方を手動で更新できます。更新された証明書は、古い証明書と同じ秘密鍵を使用します。
前提条件
- Cluster Operator が稼働している必要があります。
- CA 証明書と秘密鍵がインストールされている Kafka クラスターが必要です。
手順
strimzi.io/force-renew
アノテーションを、更新対象の CA 証明書が含まれるSecret
に適用します。証明書 Secret annotate コマンド クラスター CA
<cluster-name>-cluster-ca-cert
oc annotate secret <cluster-name>-cluster-ca-cert strimzi.io/force-renew=true
クライアント CA
<cluster-name>-clients-ca-cert
oc annotate secret <cluster-name>-clients-ca-cert strimzi.io/force-renew=true
次回の調整で、アノテーションを付けた Secret
の新規 CA 証明書が Cluster Operator によって生成されます。メンテナンス時間枠が設定されている場合、Cluster Operator によって、最初の調整時に次のメンテナンス時間枠内で新規 CA 証明書が生成されます。
Cluster Operator によって更新されたクラスターおよびクライアント CA 証明書をクライアントアプリケーションがリロードする必要があります。
3.1.30. 秘密鍵の置換
クラスター CA およびクライアント CA 証明書によって使用される秘密鍵を交換できます。秘密鍵を交換すると、Cluster Operator は新しい秘密鍵の新規 CA 証明書を生成します。
前提条件
- Cluster Operator が稼働している必要があります。
- CA 証明書と秘密鍵がインストールされている Kafka クラスターが必要です。
手順
strimzi.io/force-replace
アノテーションを、更新対象の秘密鍵が含まれるSecret
に適用します。秘密鍵 Secret annotate コマンド クラスター CA
<cluster-name>-cluster-ca
oc annotate secret <cluster-name>-cluster-ca strimzi.io/force-replace=true
クライアント CA
<cluster-name>-clients-ca
oc annotate secret <cluster-name>-clients-ca strimzi.io/force-replace=true
次回の調整時に、Cluster Operator は以下を生成します。
-
アノテーションを付けた
Secret
の新しい秘密鍵 - 新規 CA 証明書
メンテナンス時間枠が設定されている場合、Cluster Operator によって、最初の調整時に次のメンテナンス時間枠内で新しい秘密鍵と CA 証明書が生成されます。
Cluster Operator によって更新されたクラスターおよびクライアント CA 証明書をクライアントアプリケーションがリロードする必要があります。
その他のリソース
3.1.31. Kafka クラスターの一部として作成されたリソースの一覧
以下のリソースは、OpenShift クラスターの Cluster Operator によって作成されます。
cluster-name-kafka
- Kafka ブローカー Pod の管理を担当する StatefulSet。
cluster-name-kafka-brokers
- DNS が Kafka ブローカー Pod の IP アドレスを直接解決するのに必要なサービス。
cluster-name-kafka-bootstrap
- サービスは、Kafka クライアントのブートストラップサーバーとして使用できます。
cluster-name-kafka-external-bootstrap
- OpenShift クラスター外部から接続するクライアントのブートストラップサービス。このリソースは、外部リスナーが有効な場合にのみ作成されます。
cluster-name-kafka-pod-id
- トラフィックを OpenShift クラスターの外部から個別の Pod にルーティングするために使用されるサービス。このリソースは、外部リスナーが有効な場合にのみ作成されます。
cluster-name-kafka-external-bootstrap
-
OpenShift クラスターの外部から接続するクライアントのブートストラップルート。このリソースは、外部リスナーが有効になっていて、タイプ
route
に設定されている場合にのみ作成されます。 cluster-name-kafka-pod-id
-
OpenShift クラスターの外部から個別の Pod へのトラフィックに対するルート。このリソースは、外部リスナーが有効になっていて、タイプ
route
に設定されている場合にのみ作成されます。 cluster-name-kafka-config
- Kafka 補助設定が含まれ、Kafka ブローカー Pod によってボリュームとしてマウントされる ConfigMap。
cluster-name-kafka-brokers
- Kafka ブローカーキーのあるシークレット。
cluster-name-kafka
- Kafka ブローカーによって使用されるサービスアカウント。
cluster-name-kafka
- Kafka ブローカーに設定された Pod の Disruption Budget。
strimzi-namespace-name-cluster-name-kafka-init
- Kafka ブローカーによって使用されるクラスターロールバインディング。
cluster-name-zookeeper
- ZooKeeper ノード Pod の管理を担当する StatefulSet。
cluster-name-zookeeper-nodes
- DNS が ZooKeeper Pod の IP アドレスを直接解決するのに必要なサービス。
cluster-name-zookeeper-client
- Kafka ブローカーがクライアントとして ZooKeeper ノードに接続するために使用するサービス。
cluster-name-zookeeper-config
- ZooKeeper 補助設定が含まれ、ZooKeeper ノード Pod によってボリュームとしてマウントされる ConfigMap。
cluster-name-zookeeper-nodes
- ZooKeeper ノードキーがあるシークレット。
cluster-name-zookeeper
- ZooKeeper ノードに設定された Pod の Disruption Budget。
cluster-name-entity-operator
- Topic および User Operator とのデプロイメント。このリソースは、Cluster Operator によって Entity Operator がデプロイされた場合のみ作成されます。
cluster-name-entity-topic-operator-config
- Topic Operator の補助設定のある ConfigMap。このリソースは、Cluster Operator によって Entity Operator がデプロイされた場合のみ作成されます。
cluster-name-entity-user-operator-config
- User Operator の補助設定のある ConfigMap。このリソースは、Cluster Operator によって Entity Operator がデプロイされた場合のみ作成されます。
cluster-name-entity-operator-certs
- Kafka および ZooKeeper と通信するための Entity Operator キーのあるシークレット。このリソースは、Cluster Operator によって Entity Operator がデプロイされた場合のみ作成されます。
cluster-name-entity-operator
- Entity Operator によって使用されるサービスアカウント。
strimzi-cluster-name-topic-operator
- Entity Operator によって使用されるロールバインディング。
strimzi-cluster-name-user-operator
- Entity Operator によって使用されるロールバインディング。
cluster-name-cluster-ca
- クラスター通信の暗号化に使用されるクラスター CA のあるシークレット。
cluster-name-cluster-ca-cert
- クラスター CA 公開鍵のあるシークレット。このキーは、Kafka ブローカーのアイデンティティーの検証に使用できます。
cluster-name-clients-ca
- Kafka ブローカーと Kafka クライアントとの間の通信を暗号化するために使用されるクライアント CA のあるシークレット。
cluster-name-clients-ca-cert
- クライアント CA 公開鍵のあるシークレット。このキーは、Kafka ブローカーのアイデンティティーの検証に使用できます。
cluster-name-cluster-operator-certs
- Kafka および ZooKeeper と通信するための Cluster Operator キーのあるシークレット。
data-cluster-name-kafka-idx
-
Kafka ブローカー Pod
idx
のデータを保存するために使用されるボリュームの永続ボリューム要求です。このリソースは、データを保存するために永続ボリュームのプロビジョニングに永続ストレージが選択された場合のみ作成されます。 data-id-cluster-name-kafka-idx
-
Kafka ブローカー Pod
idx
のデータを保存するために使用されるボリュームid
の永続ボリューム要求です。このリソースは、永続ボリュームをプロビジョニングしてデータを保存する場合に、JBOD ボリュームに永続ストレージが選択された場合のみ作成されます。 data-cluster-name-zookeeper-idx
-
ZooKeeper ノード Pod
idx
のデータを保存するために使用されるボリュームの永続ボリューム要求です。このリソースは、データを保存するために永続ボリュームのプロビジョニングに永続ストレージが選択された場合のみ作成されます。 cluster-name-jmx
- Kafka ブローカーポートのセキュア化に使用される JMX ユーザー名およびパスワードのあるシークレット。
3.2. Kafka Connect クラスターの設定
KafkaConnect
リソースの完全なスキーマは 「KafkaConnect
スキーマ参照」 に記載されています。指定の KafkaConnect
リソースに適用されたすべてのラベルは、Kafka Connect クラスターを構成する OpenShift リソースにも適用されます。そのため、必要に応じてリソースにラベルが適用されるため便利です。
3.2.1. レプリカ
Kafka Connect クラスターは、1 つ以上のノードで構成されます。ノードの数は KafkaConnect
および KafkaConnectS2I
リソースで定義されます。Kafka Connect クラスターを複数のノードで実行すると、可用性とスケーラビリティーが向上します。ただし、OpenShift で Kafka Connect を実行する場合は、高可用性のために Kafka Connect で複数のノードを実行する必要はありません。Kafka Connect がデプロイされたノードがクラッシュした場合、OpenShift によって Kafka Connect Pod が別のノードに自動的に再スケジュールされます。ただし、他のノードがすでに稼働しているため、Kafka Connect を複数のノードで実行すると、より高速なフェイルオーバーを実現できます。
3.2.1.1. ノード数の設定
Kafka Connect ノードの数は、KafkaConnect.spec
および KafkaConnectS2I.spec
の replicas
プロパティーを使用して設定されます。
前提条件
- OpenShift クラスターが必要です。
- 稼働中の Cluster Operator が必要です。
手順
KafkaConnect
またはKafkaConnectS2I
リソースのreplicas
プロパティーを編集します。以下に例を示します。apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnectS2I metadata: name: my-cluster spec: # ... replicas: 3 # ...
リソースを作成または更新します。
oc apply
を使用してこれを行うことができます。oc apply -f your-file
3.2.2. ブートストラップサーバー
Kafka Connect クラスターは、常に Kafka クラスターと組み合わせて動作します。Kafka クラスターはブートストラップサーバーのリストとして指定されます。OpenShift では、そのリストに cluster-name-kafka-bootstrap
という名前の Kafka クラスターブートストラップサービスが含まれ、さらに平文トラフィックの場合はポート 9092、暗号化されたトラフィックの場合はポート 9093 が含まれることが理想的です。
ブートストラップサーバーのリストは、KafkaConnect.spec
および KafkaConnectS2I.spec
の bootstrapServers
プロパティーで設定されます。サーバーは、1 つ以上の Kafka ブローカーを指定するコンマ区切りリスト、または hostname:_port_
ペアとして指定される Kafka ブローカーを示すサービスとして定義される必要があります。
AMQ Streams によって管理されない Kafka クラスターで Kafka Connect を使用する場合は、クラスターの設定に応じてブートストラップサーバーのリストを指定できます。
3.2.2.1. ブートストラップサーバーの設定
前提条件
- OpenShift クラスターが必要です。
- 稼働中の Cluster Operator が必要です。
手順
KafkaConnect
またはKafkaConnectS2I
リソースのbootstrapServers
プロパティーを編集します。以下に例を示します。apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnect metadata: name: my-cluster spec: # ... bootstrapServers: my-cluster-kafka-bootstrap:9092 # ...
リソースを作成または更新します。
oc apply
を使用してこれを行うことができます。oc apply -f your-file
3.2.3. TLS を使用した Kafka ブローカーへの接続
デフォルトでは、Kafka Connect はプレーンテキスト接続を使用して Kafka ブローカーへの接続を試みます。TLS を使用する場合は、追加の設定が必要です。
3.2.3.1. Kafka Connect での TLS サポート
TLS サポートは、KafkaConnect.spec
および KafkaConnectS2I.spec
の tls
プロパティーで設定されます。tls
プロパティーには、保存される証明書のキー名があるシークレットのリストが含まれます。証明書は X509 形式で保存する必要があります。
複数の証明書がある TLS 設定の例
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnect metadata: name: my-cluster spec: # ... tls: trustedCertificates: - secretName: my-secret certificate: ca.crt - secretName: my-other-secret certificate: certificate.crt # ...
複数の証明書が同じシークレットに保存されている場合は、複数回リストできます。
同じシークレットに複数の証明書がある TLS 設定の例
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnectS2I metadata: name: my-cluster spec: # ... tls: trustedCertificates: - secretName: my-secret certificate: ca.crt - secretName: my-secret certificate: ca2.crt # ...
3.2.3.2. Kafka Connect での TLS の設定
前提条件
- OpenShift クラスターが必要です。
- 稼働中の Cluster Operator が必要です。
-
TLS サーバー認証に使用される証明書の
Secret
の名前と、Secret
に保存された証明書のキー (存在する場合)。
手順
(任意手順): 認証で使用される TLS 証明書が存在しない場合はファイルで準備し、
Secret
を作成します。注記Kafka クラスターの Cluster Operator によって作成されるシークレットは直接使用されることがあります。
oc create
を使用してこれを行うことができます。oc create secret generic my-secret --from-file=my-file.crt
KafkaConnect
またはKafkaConnectS2I
リソースのtls
プロパティーを編集します。以下に例を示します。apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnect metadata: name: my-connect spec: # ... tls: trustedCertificates: - secretName: my-cluster-cluster-cert certificate: ca.crt # ...
リソースを作成または更新します。
oc apply
を使用してこれを行うことができます。oc apply -f your-file
3.2.4. 認証での Kafka ブローカーへの接続
デフォルトでは、Kafka Connect は認証なしで Kafka ブローカーへの接続を試みます。認証は KafkaConnect
および KafkaConnectS2I
リソースにより有効になります。
3.2.4.1. Kafka Connect での認証サポート
認証は、KafkaConnect.spec
および KafkaConnectS2I.spec
の authentication
プロパティーで設定されます。authentication
プロパティーによって、使用する認証メカニズムのタイプと、メカニズムに応じた追加設定の詳細が指定されます。サポートされる認証タイプは次のとおりです。
- TLS クライアント認証
- SCRAM-SHA-512 メカニズムを使用した SASL ベースの認証
- PLAIN メカニズムを使用した SASL ベースの認証
- OAuth 2.0 のトークンベースの認証
3.2.4.1.1. TLS クライアント認証
TLS クライアント認証を使用するには、type
プロパティーを tls
の値に設定します。TLS クライアント認証は TLS 証明書を使用して認証します。証明書は certificateAndKey
プロパティーで指定され、常に OpenShift シークレットからロードされます。シークレットでは、公開鍵と秘密鍵の 2 つの鍵を使用して証明書を X509 形式で保存する必要があります。
TLS クライアント認証は TLS 接続でのみ使用できます。Kafka Connect の TLS 設定の詳細は 「TLS を使用した Kafka ブローカーへの接続」 を参照してください。
TLS クライアント認証の設定例
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnect metadata: name: my-cluster spec: # ... authentication: type: tls certificateAndKey: secretName: my-secret certificate: public.crt key: private.key # ...
3.2.4.1.2. SASL ベースの SCRAM-SHA-512 認証
Kafka Connect で SASL ベースの SCRAM-SHA-512 認証が使用されるようにするには、type
プロパティーを scram-sha-512
に設定します。この認証メカニズムには、ユーザー名とパスワードが必要です。
-
username
プロパティーでユーザー名を指定します。 -
passwordSecret
プロパティーで、パスワードを含むSecret
へのリンクを指定します。secretName
プロパティーにはSecret
の名前が含まれ、password
プロパティーにはSecret
内にパスワードが格納されるキーの名前が含まれます。
password
フィールドには、実際のパスワードを指定しないでください。
SASL ベースの SCRAM-SHA-512 クライアント認証の設定例
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnect metadata: name: my-cluster spec: # ... authentication: type: scram-sha-512 username: my-connect-user passwordSecret: secretName: my-connect-user password: my-connect-password-key # ...
3.2.4.1.3. SASL ベースの PLAIN 認証
Kafka Connect で SASL ベースの PLAIN 認証が使用されるようにするには、type
プロパティーを plain
に設定します。この認証メカニズムには、ユーザー名とパスワードが必要です。
SASL PLAIN メカニズムでは、ネットワーク全体でユーザー名とパスワードがプレーンテキストで転送されます。TLS 暗号化が有効になっている場合にのみ SASL PLAIN 認証を使用します。
-
username
プロパティーでユーザー名を指定します。 -
passwordSecret
プロパティーで、パスワードを含むSecret
へのリンクを指定します。secretName
プロパティーにはそのようなSecret
の名前が含まれ、password
プロパティーにはSecret
内にパスワードが格納されるキーの名前が含まれます。
password
フィールドには、実際のパスワードを指定しないでください。
SASL ベースの PLAIN クライアント認証の設定例
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnect metadata: name: my-cluster spec: # ... authentication: type: plain username: my-connect-user passwordSecret: secretName: my-connect-user password: my-connect-password-key # ...
3.2.4.2. Kafka Connect での TLS クライアント認証の設定
前提条件
- OpenShift クラスターが必要です。
- 稼働中の Cluster Operator が必要です。
-
TLS クライアント認証に使用される公開鍵と秘密鍵がある
Secret
、およびSecret
に保存される公開鍵と秘密鍵のキー (存在する場合)。
手順
(任意手順): 認証に使用される鍵が存在しない場合はファイルで準備し、
Secret
を作成します。注記User Operator によって作成されたシークレットを使用できます。
oc create
を使用してこれを行うことができます。oc create secret generic my-secret --from-file=my-public.crt --from-file=my-private.key
KafkaConnect
またはKafkaConnectS2I
リソースのauthentication
プロパティーを編集します。以下に例を示します。apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnect metadata: name: my-connect spec: # ... authentication: type: tls certificateAndKey: secretName: my-secret certificate: my-public.crt key: my-private.key # ...
リソースを作成または更新します。
oc apply
を使用してこれを行うことができます。oc apply -f your-file
3.2.4.3. Kafka Connect での SCRAM-SHA-512 認証の設定
前提条件
- OpenShift クラスターが必要です。
- 稼働中の Cluster Operator が必要です。
- 認証に使用するユーザーのユーザー名。
-
認証に使用されるパスワードがある
Secret
の名前と、Secret
に保存されたパスワードのキー (存在する場合)。
手順
(任意手順): 認証で使用されるパスワードがない場合はファイルで準備し、
Secret
を作成します。注記User Operator によって作成されたシークレットを使用できます。
oc create
を使用してこれを行うことができます。echo -n '<password>' > <my-password.txt> oc create secret generic <my-secret> --from-file=<my-password.txt>
KafkaConnect
またはKafkaConnectS2I
リソースのauthentication
プロパティーを編集します。以下に例を示します。apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnect metadata: name: my-connect spec: # ... authentication: type: scram-sha-512 username: _<my-username>_ passwordSecret: secretName: _<my-secret>_ password: _<my-password.txt>_ # ...
リソースを作成または更新します。
OpenShift では
oc apply
を使用してこれを行うことができます。oc apply -f your-file
3.2.5. Kafka Connect の設定
AMQ Streams では、Apache Kafka ドキュメント に記載されている特定のオプションを編集して、Apache Kafka Connect ノードの設定をカスタマイズできます。
以下に関連する設定オプションは設定できません。
- Kafka クラスターブートストラップアドレス
- セキュリティー (暗号化、認証、および承認)
- リスナー / REST インターフェースの設定
- プラグインパスの設定
これらのオプションは AMQ Streams によって自動的に設定されます。
3.2.5.1. Kafka Connect の設定
Kafka Connect は、KafkaConnect.spec
および KafkaConnectS2I.spec
の config
プロパティーを使用して設定されます。このプロパティーには、Kafka Connect 設定オプションがキーとして含まれます。値は以下の JSON タイプのいずれかになります。
- String
- Number
- ブール値
AMQ Streams で直接管理されるオプションを除き、Apache Kafka ドキュメント に記載されているオプションを指定および設定できます。以下の文字列の 1 つと同じキーまたは以下の文字列の 1 つで始まるキーを持つ設定オプションは禁止されています。
-
ssl.
-
sasl.
-
security.
-
listeners
-
plugin.path
-
rest.
-
bootstrap.servers
禁止されているオプションが config
プロパティーにある場合、そのオプションは無視され、警告メッセージが Cluster Operator ログファイルに出力されます。その他のオプションはすべて Kafka Connect に渡されます。
提供された config
オブジェクトのキーまたは値は Cluster Operator によって検証されません。無効な設定を指定すると、Kafka Connect クラスターが起動しなかったり、不安定になる可能性があります。この状況で、KafkaConnect.spec.config
または KafkaConnectS2I.spec.config
オブジェクトの設定を修正すると、Cluster Operator は新しい設定をすべての Kafka Connect ノードにロールアウトできます。
以下のオプションにはデフォルト値があります。
-
group.id
、デフォルト値connect-cluster
-
offset.storage.topic
、デフォルト値connect-cluster-offsets
-
config.storage.topic
、デフォルト値connect-cluster-configs
-
status.storage.topic
、デフォルト値connect-cluster-status
-
key.converter
、デフォルト値org.apache.kafka.connect.json.JsonConverter
-
value.converter
、デフォルト値org.apache.kafka.connect.json.JsonConverter
これらのオプションは、KafkaConnect.spec.config
または KafkaConnectS2I.spec.config
プロパティーになかった場合に自動的に設定されます。
Kafka Connect の設定例
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnect metadata: name: my-connect spec: # ... config: group.id: my-connect-cluster offset.storage.topic: my-connect-cluster-offsets config.storage.topic: my-connect-cluster-configs status.storage.topic: my-connect-cluster-status key.converter: org.apache.kafka.connect.json.JsonConverter value.converter: org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable: true value.converter.schemas.enable: true config.storage.replication.factor: 3 offset.storage.replication.factor: 3 status.storage.replication.factor: 3 # ...
3.2.5.2. 複数インスタンスの Kafka Connect 設定
Kafka Connect のインスタンスを複数実行している場合は、以下のプロパティーのデフォルト設定に注意してください。
# ... group.id: connect-cluster 1 offset.storage.topic: connect-cluster-offsets 2 config.storage.topic: connect-cluster-configs 3 status.storage.topic: connect-cluster-status 4 # ...
これら 3 つのトピックの値は、同じ group.id
を持つすべての Kafka Connect インスタンスで同じする必要があります。
デフォルト設定を変更しないと、同じ Kafka クラスターに接続する各 Kafka Connect インスタンスは同じ値でデプロイされます。その結果、事実上はすべてのインスタンスが結合されてクラスターで実行され、同じトピックが使用されます。
複数の Kafka Connect クラスターが同じトピックの使用を試みると、Kafka Connect は想定どおりに動作せず、エラーが生成されます。
複数の Kafka Connect インスタンスを実行する場合は、インスタンスごとにこれらのプロパティーの値を変更してください。
3.2.5.3. Kafka Connect の設定
前提条件
- OpenShift クラスターが必要です。
- 稼働中の Cluster Operator が必要です。
手順
KafkaConnect
またはKafkaConnectS2I
リソースのconfig
プロパティーを編集します。以下に例を示します。apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnect metadata: name: my-connect spec: # ... config: group.id: my-connect-cluster offset.storage.topic: my-connect-cluster-offsets config.storage.topic: my-connect-cluster-configs status.storage.topic: my-connect-cluster-status key.converter: org.apache.kafka.connect.json.JsonConverter value.converter: org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable: true value.converter.schemas.enable: true config.storage.replication.factor: 3 offset.storage.replication.factor: 3 status.storage.replication.factor: 3 # ...
リソースを作成または更新します。
oc apply
を使用してこれを行うことができます。oc apply -f your-file
3.2.6. CPU およびメモリーリソース
AMQ Streams では、デプロイされたコンテナーごとに特定のリソースを要求し、これらのリソースの最大消費を定義できます。
AMQ Streams では、以下の 2 つのタイプのリソースがサポートされます。
- CPU
- メモリー
AMQ Streams では、CPU およびメモリーリソースの指定に OpenShift 構文が使用されます。
3.2.6.1. リソースの制限および要求
リソースの制限と要求は、以下のリソースで resources
プロパティーを使用して設定されます。
-
Kafka.spec.kafka
-
Kafka.spec.kafka.tlsSidecar
-
Kafka.spec.zookeeper
-
Kafka.spec.zookeeper.tlsSidecar
-
Kafka.spec.entityOperator.topicOperator
-
Kafka.spec.entityOperator.userOperator
-
Kafka.spec.entityOperator.tlsSidecar
-
Kafka.spec.KafkaExporter
-
KafkaConnect.spec
-
KafkaConnectS2I.spec
-
KafkaBridge.spec
その他のリソース
- OpenShift におけるコンピュートリソースの管理に関する詳細は、「Managing Compute Resources for Containers」を参照してください。
3.2.6.1.1. リソース要求
要求によって、指定のコンテナーに対して予約するリソースが指定されます。リソースを予約すると、リソースが常に利用できるようになります。
リソース要求が OpenShift クラスターで利用可能な空きリソースを超える場合、Pod はスケジュールされません。
リソース要求は requests
プロパティーで指定されます。AMQ Streams では、現在以下のリソース要求がサポートされます。
-
cpu
-
memory
1 つまたは複数のサポートされるリソースに対してリクエストを設定できます。
すべてのリソースを対象とするリソース要求の設定例
# ... resources: requests: cpu: 12 memory: 64Gi # ...
3.2.6.1.2. リソース制限
制限によって、指定のコンテナーが消費可能な最大リソースが指定されます。制限は予約されず、常に利用できるとは限りません。コンテナーは、リソースが利用できる場合のみ、制限以下のリソースを使用できます。リソース制限は、常にリソース要求よりも高くする必要があります。
リソース制限は limits
プロパティーで指定されます。AMQ Streams では、現在以下のリソース制限がサポートされます。
-
cpu
-
memory
1 つまたは複数のサポートされる制限に対してリソースを設定できます。
リソース制限の設定例
# ... resources: limits: cpu: 12 memory: 64Gi # ...
3.2.6.1.3. サポートされる CPU 形式
CPU の要求および制限は以下の形式でサポートされます。
-
整数値 (
5
CPU コア) または少数 (2.5
CPU コア) の CPU コアの数。 -
数値または ミリ CPU / ミリコア (
100m
)。1000 ミリコア は1
CPU コアと同じです。
CPU ユニットの例
# ... resources: requests: cpu: 500m limits: cpu: 2.5 # ...
1 つの CPU コアのコンピューティング能力は、OpenShift がデプロイされたプラットフォームによって異なることがあります。
その他のリソース
- CPU 仕様の詳細は、「Meaning of CPU」を参照してください。
3.2.6.1.4. サポートされるメモリー形式
メモリー要求および制限は、メガバイト、ギガバイト、メビバイト、およびギビバイトで指定されます。
-
メモリーをメガバイトで指定するには、
M
接尾辞を使用します。例:1000M
-
メモリーをギガバイトで指定するには、
G
接尾辞を使用します。例:1G
-
メモリーをメビバイトで指定するには、
Mi
接尾辞を使用します。例:1000Mi
- メモリーを