AMQ Streams on OpenShift のデプロイおよびアップグレード
OpenShift Container Platform での AMQ Streams 2.2 のデプロイ
概要
多様性を受け入れるオープンソースの強化 リンクのコピーリンクがクリップボードにコピーされました!
Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。まずは、マスター (master)、スレーブ (slave)、ブラックリスト (blacklist)、ホワイトリスト (whitelist) の 4 つの用語の置き換えから始めます。この取り組みは膨大な作業を要するため、今後の複数のリリースで段階的に用語の置き換えを実施して参ります。詳細は、Red Hat CTO である Chris Wright のメッセージ をご覧ください。
第1章 デプロイメントの概要 リンクのコピーリンクがクリップボードにコピーされました!
AMQ Streams は、OpenShift クラスターで Apache Kafka を実行するプロセスを簡素化します。
このガイドでは、AMQ Streams のデプロイとアップグレードに使用できるすべてのオプションについて取り上げ、さらに、デプロイメントの対象や、OpenShift クラスターで Apache Kafka を実行するために必要なデプロイメントの順序について説明します。
デプロイメントの手順を説明する他に、デプロイメントを準備および検証するためのデプロイメントの前および後の手順についても説明します。本ガイドでは、メトリクスを導入するための追加のデプロイメントオプションについても説明しています。
アップグレードの手順は、AMQ Streams および Kafka のアップグレードを参照してください。
AMQ Streams は、パブリックおよびプライベートクラウドから開発を目的とするローカルデプロイメントまで、ディストリビューションに関係なく、すべてのタイプの OpenShift クラスターで動作するよう設計されています。
1.1. デプロイメントの設定 リンクのコピーリンクがクリップボードにコピーされました!
本ガイドのデプロイメント手順は、デプロイメントの初期構造の設定に役立つように設計されています。構造の設定後、カスタムリソースを使用して、正確なニーズに合わせてデプロイメントを設定できます。デプロイメント手順では、AMQ Streams に同梱されているインストールファイルのサンプルを使用します。この手順では、設定に関する重要な考慮事項について主に触れていますが、使用可能なすべての設定オプションについて説明するわけではありません。
AMQ Streams をデプロイする前に、Kafka コンポーネントに使用できる設定オプションを確認することを推奨します。設定オプションの詳細は、OpenShift での AMQ Streams の設定を参照してください。
1.1.1. Kafka のセキュリティー リンクのコピーリンクがクリップボードにコピーされました!
Cluster Operator はデプロイ時に、クラスター内のデータ暗号化と認証に対して TLS 証明書を自動設定します。
AMQ Streams では、暗号化、認証、および 承認 の追加の設定オプションが提供されます。
- Kafka へのセキュアなアクセスの管理 を行い、Kafka クラスターとクライアント間のデータ交換のセキュリティーを確保します。
- 承認サーバーが OAuth 2.0 認証 および OAuth 2.0 承認 を使用するように、デプロイメントを設定します。
- 独自の証明書を使用して Kafka をセキュア にします。
1.1.2. デプロイメントの監視 リンクのコピーリンクがクリップボードにコピーされました!
AMQ Streams は、デプロイメントを監視する追加のデプロイメントオプションをサポートします。
- Prometheus および Grafana を Kafka クラスターでデプロイ し、メトリックを抽出して、Kafka コンポーネントを監視します。
- Kafka Exporter を Kafka クラスターでデプロイ し、特にコンシューマーラグの監視に関する追加のメトリクスを抽出します。
- 分散トレーシングを設定 し、エンドツーエンドのメッセージ追跡を行います。
1.1.3. CPU およびメモリーのリソース制限および要求 リンクのコピーリンクがクリップボードにコピーされました!
デフォルトでは、AMQ Streams Cluster Operator はデプロイするオペランドの CPU およびメモリーリソースの要求および制限を指定しません。
Kafka などのアプリケーションでリソースの安定性を確保してパフォーマンスを向上させるには、十分なリソースを確保する必要があります。
使用する適切なリソース量は、特定の要件やユースケースによって異なります。
CPU およびメモリーリソースの設定を検討してください。AMQ Streams カスタムリソース の各コンテナーのリソース要求および制限を設定できます。
1.2. AMQ Streams のカスタムリソース リンクのコピーリンクがクリップボードにコピーされました!
AMQ Streams を使用した Kafka コンポーネントの OpenShift クラスターへのデプロイメントは、カスタムリソースの適用により高度な設定が可能です。カスタムリソースは、OpenShift リソースを拡張するために CRD (カスタムリソース定義、Custom Resource Definition) によって追加される API のインスタンスとして作成されます。
CRD は、OpenShift クラスターでカスタムリソースを記述するための設定手順として機能し、デプロイメントで使用する Kafka コンポーネントごとに AMQ Streams で提供されます。CRD およびカスタムリソースは YAML ファイルとして定義されます。YAML ファイルのサンプルは AMQ Streams ディストリビューションに同梱されています。
また、CRD を使用すると、CLI へのアクセスや設定検証などのネイティブ OpenShift 機能を AMQ Streams リソースで活用することもできます。
1.2.1. AMQ Streams カスタムリソースの例 リンクのコピーリンクがクリップボードにコピーされました!
AMQ Streams 固有のリソースのインスタンス化と管理に使用されるスキーマを定義するには、CRD をクラスターに 1 回だけインストールする必要があります。
CRD をインストールして新規カスタムリソースタイプをクラスターに追加した後に、その仕様に基づいてリソースのインスタンスを作成できます。
クラスターの設定によりますが、インストールには通常、クラスター管理者権限が必要です。
カスタムリソースの管理は、AMQ Streams 管理者のみが行えます。詳細は、AMQ Streams 管理者の指定 を参照してください。
kind:Kafka などの新しい kind リソースは、OpenShift クラスター内で CRD によって定義されます。
Kubernetes API サーバーを使用すると、kind を基にしたカスタムリソースの作成が可能になり、カスタムリソースが OpenShift クラスターに追加されたときにカスタムリソースの検証および格納方法を CRD から判断します。
CRD が削除されると、そのタイプのカスタムタイプも削除されます。さらに、Pod や Statefulset などのカスタムリソースによって作成されたリソースも削除されます。
AMQ Streams 固有の各カスタムリソースは、リソースの kind の CRD によって定義されるスキーマに準拠します。AMQ Streams コンポーネントのカスタムリソースには、spec で定義される共通の設定プロパティーがあります。
CRD とカスタムリソースの関係を理解するため、Kafka トピックの CRD の例を見てみましょう。
Kafka トピックの CRD
- 1
- CRD を識別するためのトピック CRD、その名前および名前のメタデータ。
- 2
- グループ (ドメイン) 名、複数名、サポート対象のスキーマバージョンなど、この CRD の仕様。トピックの API にアクセスするために URL で使用されます。他の名前は、CLI のインスタンスリソースを識別するために使用されます。たとえば、
oc get kafkaShortNametopic my-topicやoc get kafkatopicsなどです。 - 3
- ShortName は CLI コマンドで使用できます。たとえば、
oc get kafkatopicの代わりにoc get ktを略名として使用できます。 - 4
- カスタムリソースで
getコマンドを使用する場合に示される情報。 - 5
- リソースの スキーマ参照 に記載されている CRD の現在の状態。
- 6
- openAPIV3Schema 検証によって、トピックカスタムリソースの作成が検証されます。たとえば、トピックには 1 つ以上のパーティションと 1 つのレプリカが必要です。
ファイル名に、インデックス番号とそれに続く Crd が含まれるため、AMQ Streams インストールファイルと提供される CRD YAML ファイルを識別できます。
KafkaTopic カスタムリソースに該当する例は次のとおりです。
Kafka トピックカスタムリソース
- 1
kindおよびapiVersionによって、インスタンスであるカスタムリソースの CRD が特定されます。- 2
- トピックまたはユーザーが属する Kafka クラスターの名前 (
Kafkaリソースの名前と同じ) を定義する、KafkaTopicおよびKafkaUserリソースのみに適用可能なラベル。 - 3
- 指定内容には、トピックのパーティション数およびレプリカ数や、トピック自体の設定パラメーターが示されています。この例では、メッセージがトピックに保持される期間や、ログのセグメントファイルサイズが指定されています。
- 4
KafkaTopicリソースのステータス条件。lastTransitionTimeでtype条件がReadyに変更されています。
プラットフォーム CLI からカスタムリソースをクラスターに適用できます。カスタムリソースが作成されると、Kubernetes API の組み込みリソースと同じ検証が使用されます。
KafkaTopic の作成後、Topic Operator は通知を受け取り、該当する Kafka トピックが AMQ Streams で作成されます。
1.3. Kafka Bridge を使用した Kafka クラスターへの接続 リンクのコピーリンクがクリップボードにコピーされました!
AMQ Streams Kafka Bridge API を使用して、コンシューマーを作成および管理し、ネイティブ Kafka プロトコルではなく HTTP を介してレコードを送受信できます。
Kafka Bridge を設定する場合、Kafka クラスターへの HTTP アクセスを設定します。その後、Kafka Bridge を使用して、クラスターからのメッセージを生成および消費したり、REST インターフェイスを介して他の操作を実行することができます。
1.4. 本書の表記慣例 リンクのコピーリンクがクリップボードにコピーされました!
ユーザー置換値
ユーザーが置き換える値は、置き換え可能 な値とも呼ばれ、山かっこ (<>) を付けて 斜体 で表示されます。アンダースコア ( _ ) は、複数単語の値に使用されます。値がコードまたはコマンドを参照する場合は monospace も使用されます。
たとえば、以下のコードでは <my_namespace> を namespace の名前に置き換えます。
sed -i 's/namespace: .*/namespace: <my_namespace>/' install/cluster-operator/*RoleBinding*.yaml
sed -i 's/namespace: .*/namespace: <my_namespace>/' install/cluster-operator/*RoleBinding*.yaml
第2章 AMQ Streams のインストール方法 リンクのコピーリンクがクリップボードにコピーされました!
AMQ Streams を OpenShift 4.8 から 4.11 にインストールする方法は 2 つあります。
| インストール方法 | 説明 |
|---|---|
|
AMQ Streams ソフトウェアダウンロードページ から、Red Hat AMQ Streams 2.2 OpenShift インストールおよびサンプルファイル をダウンロードします。
| |
| OperatorHub で Red Hat Integration - AMQ Streams を使用し、AMQ Streams を単一の namespace またはすべての namespace にデプロイします。 |
できるだけ柔軟性を確保するには、アーティファクトのインストール方法を選択してください。OperatorHub メソッドは標準的な設定を提供し、自動更新を活用できるようにします。
第3章 AMQ Streams でデプロイされる内容 リンクのコピーリンクがクリップボードにコピーされました!
Apache Kafka コンポーネントは、AMQ Streams ディストリビューションを使用して OpenShift にデプロイする場合に提供されます。Kafka コンポーネントは通常、クラスターとして実行され、可用性を確保します。
Kafka コンポーネントが組み込まれた通常のデプロイメントには以下が含まれます。
- ブローカーノードの Kafka クラスター
- レプリケートされた ZooKeeper インスタンスの zookeeper クラスター
- 外部データ接続用の Kafka Connect クラスター
- セカンダリークラスターで Kafka クラスターをミラーリングする Kafka MirrorMaker クラスター
- 監視用に追加の Kafka メトリックデータを抽出する Kafka Exporter
- Kafka クラスターに対して HTTP ベースの要求を行う Kafka Bridge
少なくとも Kafka および ZooKeeper は必要ですが、上記のコンポーネントがすべて必須なわけではありません。MirrorMaker や Kafka Connect など、一部のコンポーネントでは Kafka なしでデプロイできます。
3.1. デプロイメントの順序 リンクのコピーリンクがクリップボードにコピーされました!
OpenShift クラスターへのデプロイで必要とされる順序は、次のとおりです。
- Cluster Operator をデプロイし、Kafka クラスターを管理します。
- ZooKeeper クラスターとともに Kafka クラスターをデプロイし、Topic Operator および User Operator がデプロイメントに含まれるようにします。
任意で以下をデプロイします。
- Topic Operator および User Operator (Kafka クラスターとともにデプロイしなかった場合)
- Kafka Connect
- Kafka MirrorMaker
- Kafka Bridge
- メトリクスを監視するためのコンポーネント
Cluster Operator は、 Deployment、Service、および Pod リソースなど、コンポーネントの OpenShift リソースを作成します。OpenShift リソース名には、デプロイ時にコンポーネントに指定された名前が追加されます。たとえば、my-kafka-cluster という名前の Kafka クラスターには、 my-kafka-cluster-kafka という名前のサービスがあります。
第4章 AMQ Streams デプロイメントの準備 リンクのコピーリンクがクリップボードにコピーされました!
ここでは、AMQ Streams デプロイメントを準備する方法を説明します。
本ガイドのコマンドを実行するには、クラスターユーザーに RBAC (ロールベースアクセス制御) および CRD を管理する権限を付与する必要があります。
4.1. デプロイメントの前提条件 リンクのコピーリンクがクリップボードにコピーされました!
AMQ Streams をデプロイするには、以下が必要です。
OpenShift 4.8 から 4.11 クラスター。
AMQ Streams は AMQ Streams Strimzi 0.29.x をベースとしている。
-
ocコマンドラインツールがインストールされ、稼働中のクラスターに接続するように設定されている。
4.2. AMQ Streams リリースアーティファクトのダウンロード リンクのコピーリンクがクリップボードにコピーされました!
デプロイメントファイルを使用して AMQ Streams をインストールするには、AMQ Streams ソフトウェアダウンロードページ からファイルをダウンロードしてデプロイメントします。
AMQ Streams のリリースアーティファクトには、YAML ファイルが含まれています。これらのファイルは、AMQ Streams コンポーネントの OpenShift へのデプロイ、共通の操作の実行、および Kafka クラスターの設定に便利です。
oc を使用して、ダウンロードした ZIP ファイルの install/cluster-operator フォルダーから Cluster Operator をデプロイします。Cluster Operator のデプロイメントおよび設定に関する詳細は、「Cluster Operator のデプロイ」 を参照してください。
また、AMQ Streams Cluster Operator によって管理されない Kafka クラスターをトピックおよび User Operator のスタンドアロンインストールと共に使用する場合は、install/topic-operator および install/user-operator フォルダーからデプロイできます。
AMQ Streams コンテナーイメージは、Red Hat Ecosystem Catalog から使用することもできます。ただし、指定の YAML ファイルを使用して AMQ Streams をデプロイすることを推奨します。
4.3. 設定ファイルとデプロイメントファイルの例 リンクのコピーリンクがクリップボードにコピーされました!
AMQ Streams で提供される設定およびデプロイメントファイルの例を使用して、異なる設定で Kafka コンポーネントをデプロイし、デプロイメントを監視します。カスタムリソースの設定ファイルの例には、重要なプロパティーおよび値が含まれています。これは、独自のデプロイメントでサポートされる追加の設定プロパティーで拡張できます。
4.3.1. ファイルの場所の例 リンクのコピーリンクがクリップボードにコピーされました!
サンプルファイルは、AMQ Streams ソフトウェアダウンロードページからダウンロード可能なリリースアーティファクトとともに提供されます。
oc コマンドラインツールを使用してサンプルをダウンロードおよび適用できます。これらの例は、デプロイメントに独自の Kafka コンポーネント設定を構築する際の開始点として使用できます。
Operator を使用して AMQ Streams をインストールした場合でも、サンプルファイルをダウンロードして、そのファイルを使用して設定をアップロードできます。
4.3.2. AMQ Streams で提供されるサンプルファイル リンクのコピーリンクがクリップボードにコピーされました!
リリースアーティファクトには、examples ディレクトリーがあり、そこに設定例が含まれています。
Examples ディレクトリー
- 1
- User Operator によって管理される
KafkaUserカスタムリソース設定。 - 2
- Topic Operator によって管理される
KafkaTopicカスタムリソースの設定。 - 3
- Kafka コンポーネントの認証および承認設定。TLS および SCRAM-SHA-512 認証の設定例が含まれています。Red Hat Single Sign-On の例には、
Kafkaカスタムリソース設定および Red Hat Single Sign-On レルム仕様が含まれています。この例を使用して、Red Hat Single Sign-On 承認サービスを試すことができます。 - 4
- Mirror Maker のデプロイメント用の
Kafkaカスタムリソース設定。レプリケーションポリシーおよび同期頻度の設定例が含まれます。 - 5
- Prometheus インストールおよび Grafana ダッシュボードファイルが含まれる メトリック設定。
- 6
- Kafka のデプロイメント用の
Kafkaカスタムリソース設定。一時的または永続的なシングルまたはマルチノードデプロイメントの設定例が含まれています。 - 7
- Cruise Control のデプロイ設定を含む
Kafkaカスタムリソース。デフォルトまたはユーザー最適化ゴールを使用する設定の例とともに、Cruise Control から最適化プロポーザルを生成するためのKafkaRebalanceカスタムリソースが含まれます。 - 8
- Kafka Connect をデプロイするための
KafkaConnectおよびKafkaConnectorカスタムリソース設定。シングルまたはマルチノードデプロイメントの設定例が含まれています。 - 9
- Kafka Bridge をデプロイするための
KafkaBridgeカスタムリソース設定。
4.4. コンテナーイメージの独自のレジストリーへのプッシュ リンクのコピーリンクがクリップボードにコピーされました!
AMQ Streams のコンテナーイメージは Red Hat Ecosystem Catalog にあります。AMQ Streams によって提供されるインストール YAML ファイルは、直接 Red Hat Ecosystem Catalog からイメージをプルします。
Red Hat Ecosystem Catalog にアクセスできない場合や独自のコンテナーリポジトリーを使用する場合は以下を行います。
- リストにある すべての コンテナーイメージをプルします。
- 独自のレジストリーにプッシュします。
- インストール YAML ファイルのイメージ名を更新します。
リリースに対してサポートされる各 Kafka バージョンには別のイメージがあります。
| コンテナーイメージ | namespace/リポジトリー | 説明 |
|---|---|---|
| Kafka |
| 次を含む、Kafka を実行するための AMQ Streams イメージ
|
| Operator |
| Operator を実行するための AMQ Streams イメージ
|
| Kafka Bridge |
| AMQ Streams Kafka Bridge を稼働するための AMQ Streams イメージ |
| AMQ Streams Drain Cleaner |
| AMQ Streams Drain Cleaner を実行するための AMQ Streams イメージ |
4.5. AMQ Streams の管理者の指定 リンクのコピーリンクがクリップボードにコピーされました!
AMQ Streams では、デプロイメントの設定にカスタムリソースが提供されます。デフォルトでは、これらのリソースの表示、作成、編集、および削除権限は OpenShift クラスター管理者に限定されます。AMQ Streams には、これらの権利を他のユーザーに割り当てるために使用できる 2 つのクラスターロールが用意されています。
-
strimzi-viewロールを指定すると、ユーザーは AMQ Streams リソースを表示できます。 -
strimzi-adminロールを指定すると、ユーザーは AMQ Streams リソースを作成、編集、または削除することもできます。
これらのロールをインストールすると、これらの権限が自動的にデフォルトの OpenShift クラスターロールに集約 (追加) されます。strimzi-view は view ロールに、strimzi-admin は edit および admin ロールに集約されます。このように集約することで、すでに同様の権限を持つユーザーに、これらのロールを割り当てる必要がなくなる可能性があります。
以下の手順では、クラスター管理者でないユーザーが AMQ Streams リソースを管理できるようにする strimzi-admin ロールの割り当て方法を説明します。
システム管理者は、Cluster Operator のデプロイ後に AMQ Streams の管理者を指定できます。
前提条件
- Cluster Operator でデプロイとデプロイされた CRD (カスタムリソース定義) を管理する AMQ Streams の CRD リソースおよび RBAC (ロールベースアクセス制御) リソース。
手順
OpenShift で
strimzi-viewおよびstrimzi-adminクラスターロールを作成します。oc create -f install/strimzi-admin
oc create -f install/strimzi-adminCopy to Clipboard Copied! Toggle word wrap Toggle overflow 必要な場合は、ユーザーに必要なアクセス権限を付与するロールを割り当てます。
oc create clusterrolebinding strimzi-admin --clusterrole=strimzi-admin --user=user1 --user=user2
oc create clusterrolebinding strimzi-admin --clusterrole=strimzi-admin --user=user1 --user=user2Copy to Clipboard Copied! Toggle word wrap Toggle overflow
第5章 Web コンソールを使用した OperatorHub からの AMQ Streams のインストール リンクのコピーリンクがクリップボードにコピーされました!
OpenShift Container Platform Web コンソールの OperatorHub から Red Hat Integration - AMQ Streams Operator をインストールします。
本セクションの手順では以下の方法を説明します。
5.1. Red Hat Integration Operator を使用した AMQ Streams Operator のインストール リンクのコピーリンクがクリップボードにコピーされました!
Red Hat Integration Operator (非推奨) を使用すると、Red Hat Integration コンポーネントを管理する Operator を選択およびインストールできます。複数の Red Hat Integration サブスクリプションがある場合、Red Hat Integration Operator を使用して、AMQ Streams Operator およびサブスクライブしている Red Hat Integration コンポーネントのすべての Operator をインストールおよび更新できます。
AMQ Streams Operator の場合は、Operator Lifecycle Manager (OLM) を使用して、OCP コンソールの OperatorHub から OpenShift Container Platform (OCP) クラスターに Red Hat Integration Operator をインストールできます。
Red Hat Integration Operator は非推奨となり、今後削除予定です。OpenShift 4.6 から 4.10 では、OperatorHub で利用できる予定です。
5.2. OperatorHub からの AMQ Streams Operator のインストール リンクのコピーリンクがクリップボードにコピーされました!
OpenShift Container Platform Web コンソールの OperatorHub を使用して、AMQ Streams Operator をインストールしてサブスクライブできます。
この手順では、プロジェクトを作成し、そのプロジェクトに AMQ Streams Operator をインストールする方法を説明します。プロジェクトは namespace を表します。namespace を使用して機能を分離することで管理性を確保することを推奨します。
適切な更新チャネルを使用するようにしてください。サポート対象の OpenShift のバージョンを使用している場合は、デフォルトの stable チャネルから安全に AMQ Streams をインストールできます。ただし、stable チャネルで自動更新を有効にすることは推奨されません。自動アップグレードでは、アップグレード前の必要手順がスキップされます。バージョン固有のチャネルでのみ自動アップグレードを使用します。
前提条件
-
cluster-adminまたはstrimzi-adminパーミッションを持つアカウントを使用して OpenShift Container Platform Web コンソールにアクセスできる。
手順
OpenShift Web コンソールで Home > Projects ページに移動し、インストール用のプロジェクト (namespace) を作成します。
この例では、
amq-streams-kafkaという名前のプロジェクトを使用します。- Operators > OperatorHub ページに移動します。
スクロール、または Filter by keyword ボックスにキーワードを入力して、Red Hat Integration - AMQ Streams Operator を見つけます。
Operator は、Streaming & Messaging カテゴリーにあります。
- Red Hat Integration - AMQ Streams をクリックして、Operator 情報を表示します。
- Operator に関する情報を確認し、Install をクリックします。
Install Operator ページで、次のインストールおよび更新オプションから選択します。
Update Channel: Operator の更新チャネルを選択します。
- stable チャネル (デフォルト) には最新の更新とリリースがすべて含まれます。これには、十分なテストを行った上、安定していることが想定される、メジャー、マイナー、およびマイクロリリースが含まれます。
- amq-streams-X.x チャネルには、メジャーリリースのマイナーリリースの更新およびマイクロリリースの更新が含まれます。X は、メジャーリリースのバージョン番号に置き換えてください。
- amq-streams-X.Y.x チャネルには、マイナーリリースのマイクロリリースの更新が含まれます。X はメジャーリリースのバージョン番号、Y はマイナーリリースのバージョン番号に置き換えてください。
Installation Mode: 作成したプロジェクトを選択して、特定の namespace に Operator をインストールします。
AMQ Streams Operator をクラスターのすべての namespace (デフォルトのオプション) にインストールするか、特定の namespace にインストールするかを選択できます。特定の namespace を Kafka クラスターおよびその他の AMQ Streams コンポーネント専用とすることが推奨されます。
- Update approval: デフォルトでは、OLM (Operator Lifecycle Manager) によって、AMQ Streams Operator が自動的に最新の AMQ Streams バージョンにアップグレードされます。今後のアップグレードを手動で承認する場合は、Manual を選択します。詳細は、OpenShift ドキュメントの Operators ガイドを参照してください。
Install をクリックして、選択した namespace に Operator をインストールします。
AMQ Streams Operator によって、Cluster Operator、CRD、およびロールベースアクセス制御 (RBAC) リソースは選択された namespace にデプロイされます。
Operator を使用する準備ができたら、Operators > Installed Operators に移動して、Operator が選択した namespace にインストールされていることを確認します。
ステータスは Succeeded と表示されます。
これで、AMQ Streams Operator を使用して、Kafka クラスターから順に Kafka コンポーネントをデプロイできるようになりました。
Workloads > Deployments に移動すると、Cluster Operator および Entity Operator のデプロイメントの詳細を確認できます。Cluster Operator の名前には、バージョン番号 amq-streams-cluster-operator-<version> が含まれます。AMQ Streams インストールアーティファクトを使用して Cluster Operator をデプロイする場合、名前は異なります。この場合、名前は strimzi-cluster-operator です。
5.3. AMQ Streams Operator を使用した Kafka コンポーネントのデプロイ リンクのコピーリンクがクリップボードにコピーされました!
Openshift にインストールすると、AMQ Streams Operator は、ユーザーインターフェイスから Kafka コンポーネントをインストールできるようにします。
次の Kafka コンポーネントをインストールできます。
- Kafka
- Kafka Connect
- Kafka MirrorMaker
- Kafka MirrorMaker 2
- Kafka Topic
- Kafka User
- Kafka Bridge
- Kafka Connector
- Kafka Rebalance
コンポーネントを選択して、インスタンスを作成します。少なくとも、Kafka インスタンスを作成します。この手順では、デフォルト設定を使用して Kafka インスタンスを作成する方法を説明します。インストールを実行する前に、デフォルトのインストール仕様を設定できます。
プロセスは、他の Kafka コンポーネントのインスタンスを作成する場合と同じです。
前提条件
- AMQ Streams Operator が OpenShift クラスターにインストールされている。
手順
Web コンソールで Operators > Installed Operators ページに移動し、Red Hat Integration - AMQ Streams をクリックして、Operator の詳細を表示します。
提供されている API から、Kafka コンポーネントのインスタンスを作成できます。
Kafka の下の Create instance をクリックして、Kafka インスタンスを作成します。
デフォルトでは、3 つの Kafka ブローカーノードと 3 つの ZooKeeper ノードを持つ
my-clusterという名の Kafka クラスターを作成します。クラスターはエフェメラルストレージを使用します。Create をクリックして、Kafka のインストールを開始します。
ステータスが Ready に変わるまで待ちます。
第6章 インストールアーティファクトを使用した AMQ Streams のデプロイ リンクのコピーリンクがクリップボードにコピーされました!
AMQ Streams のデプロイメント環境を準備 したら、AMQ Streams を OpenShift クラスターにデプロイできます。リリースアーティファクトで提供されるデプロイメントファイルを使用できます。
デプロイメントファイルを使用して Kafka クラスターを作成します。
任意で、要件に応じて以下の Kafka コンポーネントをデプロイできます。
AMQ Streams は Strimzi 0.29.x をベースとしています。AMQ Streams 2.2 は、OpenShift 4.8 から 4.11 にデプロイできます。
本ガイドのコマンドを実行するには、クラスターユーザーに RBAC (ロールベースアクセス制御) および CRD を管理する権限を付与する必要があります。
6.1. Kafka クラスターの作成 リンクのコピーリンクがクリップボードにコピーされました!
Cluster Operator で Kafka クラスターを管理できるようにするには、これを Kafka リソースとしてデプロイする必要があります。AMQ Streams では、この目的のために、デプロイメントファイルのサンプルが同梱されています。これらのファイルを使用して、Topic Operator および User Operator を同時にデプロイできます。
Kafka クラスターを Kafka リソースとしてデプロイしていない場合は、Cluster Operator を使用してこのクラスターを管理できません。これには、OpenShift 外で実行されている Kafka クラスターなどが該当します。ただし、Topic Operator および User Operator は、スタンドアロンコンポーネントとしてデプロイし、使用することができます。
Cluster Operator は、OpenShift クラスターの 1 つ、複数、またはすべての namespace を監視できます。Topic Operator および User Operator は、単一の namespace で KafkaTopic および KafkaUser リソースを監視します。詳細は、AMQ Streams Operator を使用した namespace の監視 を参照してください。
6.1.1. Topic Operator および User Operator と Kafka クラスターのデプロイ リンクのコピーリンクがクリップボードにコピーされました!
AMQ Streams によって管理される Kafka クラスターを Topic Operator および User Operator と使用する場合は、このデプロイメント手順を実行します。
- Cluster Operator をデプロイします。
Cluster Operator を使用して以下をデプロイします。
6.1.2. スタンドアロン Topic Operator および User Operator のデプロイ リンクのコピーリンクがクリップボードにコピーされました!
AMQ Streams によって管理されない Kafka クラスターを Topic Operator および User Operator と使用する場合は、このデプロイメント手順を実行します。
6.1.3. Cluster Operator のデプロイ リンクのコピーリンクがクリップボードにコピーされました!
Cluster Operator は、OpenShift クラスター内で Apache Kafka クラスターのデプロイおよび管理を行います。
本セクションの手順では、以下のいずれかを監視するように Cluster Operator をデプロイする方法を説明します。
6.1.3.1. Cluster Operator デプロイメントの監視オプション リンクのコピーリンクがクリップボードにコピーされました!
Cluster Operator の稼働中に、Kafka リソースの更新に対する監視が開始されます。
Cluster Operator をデプロイして、以下からの Kafka リソースの監視を選択できます。
- 単一の namespace (Cluster Operator が含まれる同じ namespace)
- 複数の namespace
- すべての namespace
AMQ Streams では、デプロイメントの処理を簡単にするため、YAML ファイルのサンプルが提供されます。
Cluster Operator では、以下のリソースの変更が監視されます。
-
Kafka クラスターの
Kafka。 -
Kafka Connect クラスターの
KafkaConnect。 -
Kafka Connect クラスターでコネクターを作成および管理するための
KafkaConnector。 -
Kafka MirrorMaker インスタンスの
KafkaMirrorMaker。 -
Kafka MirrorMaker 2.0 インスタンスの
KafkaMirrorMaker2。 -
Kafka Bridge インスタンスの
KafkaBridge。 -
Cruise Control の最適化リクエストの
KafkaRebalance。
OpenShift クラスターでこれらのリソースの 1 つが作成されると、Operator がクラスターの詳細をリソースから取得します。さらに、StatefulSet、Service、および ConfigMap などの必要な OpenShift リソースが作成され、リソースの新しいクラスターの作成が開始されます。
Kafka リソースが更新されるたびに、リソースのクラスターを設定する OpenShift リソースで該当する更新が Operator によって実行されます。
リソースは、パッチを適用するか削除してから、再作成して、目的とするクラスターの状態を、リソースのクラスターに反映させます。この操作は、サービスの中断を引き起こすローリング更新の原因となる可能性があります。
リソースが削除されると、Operator によってクラスターがアンデプロイされ、関連する OpenShift リソースがすべて削除されます。
6.1.3.2. 単一の namespace を監視対象とする Cluster Operator のデプロイメント リンクのコピーリンクがクリップボードにコピーされました!
この手順では、OpenShift クラスターの単一の namespace で AMQ Streams リソースを監視する Cluster Operator をデプロイする方法を説明します。
前提条件
-
この手順では、
CustomResourceDefinitions、ClusterRoles、およびClusterRoleBindingsを作成できる OpenShift ユーザーアカウントを使用する必要があります。通常、OpenShift クラスターでロールベースアクセス制御 (RBAC) を使用する場合、これらのリソースを作成、編集、および削除する権限を持つユーザーはsystem:adminなどの OpenShift クラスター管理者に限定されます。
手順
Cluster Operator のインストール先の namespace を使用するように、AMQ Streams インストールファイルを編集します。
たとえば、この手順では Cluster Operator は
<my_cluster_operator_namespace>という namespace にインストールされます。Linux の場合は、以下を使用します。
sed -i 's/namespace: .*/namespace: <my_cluster_operator_namespace>/' install/cluster-operator/*RoleBinding*.yaml
sed -i 's/namespace: .*/namespace: <my_cluster_operator_namespace>/' install/cluster-operator/*RoleBinding*.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow MacOS の場合は、以下を使用します。
sed -i '' 's/namespace: .*/namespace: <my_cluster_operator_namespace>/' install/cluster-operator/*RoleBinding*.yaml
sed -i '' 's/namespace: .*/namespace: <my_cluster_operator_namespace>/' install/cluster-operator/*RoleBinding*.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow Cluster Operator をデプロイします。
oc create -f install/cluster-operator -n <my_cluster_operator_namespace>
oc create -f install/cluster-operator -n <my_cluster_operator_namespace>Copy to Clipboard Copied! Toggle word wrap Toggle overflow デプロイメントのステータスを確認します。
oc get deployments -n <my_cluster_operator_namespace>
oc get deployments -n <my_cluster_operator_namespace>Copy to Clipboard Copied! Toggle word wrap Toggle overflow デプロイメント名と準備状態が表示されている出力
NAME READY UP-TO-DATE AVAILABLE strimzi-cluster-operator 1/1 1 1
NAME READY UP-TO-DATE AVAILABLE strimzi-cluster-operator 1/1 1 1Copy to Clipboard Copied! Toggle word wrap Toggle overflow READYは、Ready/expected 状態のレプリカ数を表示します。AVAILABLE出力に1が表示されれば、デプロイメントは成功しています。
6.1.3.3. 複数の namespace を監視対象とする Cluster Operator のデプロイメント リンクのコピーリンクがクリップボードにコピーされました!
この手順では、OpenShift クラスターの複数の namespace で AMQ Streams リソースを監視する Cluster Operator をデプロイする方法を説明します。
前提条件
-
この手順では、
CustomResourceDefinitions、ClusterRoles、およびClusterRoleBindingsを作成できる OpenShift ユーザーアカウントを使用する必要があります。通常、OpenShift クラスターでロールベースアクセス制御 (RBAC) を使用する場合、これらのリソースを作成、編集、および削除する権限を持つユーザーはsystem:adminなどの OpenShift クラスター管理者に限定されます。
手順
Cluster Operator のインストール先の namespace を使用するように、AMQ Streams インストールファイルを編集します。
たとえば、この手順では Cluster Operator は
<my_cluster_operator_namespace>という namespace にインストールされます。Linux の場合は、以下を使用します。
sed -i 's/namespace: .*/namespace: <my_cluster_operator_namespace>/' install/cluster-operator/*RoleBinding*.yaml
sed -i 's/namespace: .*/namespace: <my_cluster_operator_namespace>/' install/cluster-operator/*RoleBinding*.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow MacOS の場合は、以下を使用します。
sed -i '' 's/namespace: .*/namespace: <my_cluster_operator_namespace>/' install/cluster-operator/*RoleBinding*.yaml
sed -i '' 's/namespace: .*/namespace: <my_cluster_operator_namespace>/' install/cluster-operator/*RoleBinding*.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow install/cluster-operator/060-Deployment-strimzi-cluster-operator.yamlファイルを編集し、Cluster Operator が監視するすべての namespace のリストをSTRIMZI_NAMESPACE環境変数に追加します。たとえば、この手順では Cluster Operator は
watched-namespace-1、watched-namespace-2、およびwatched-namespace-3という namespace を監視します。Copy to Clipboard Copied! Toggle word wrap Toggle overflow リストした各 namespace に
RoleBindingsをインストールします。この例では、コマンドの
watched-namespaceを前述のステップでリストした namespace に置き換えます。watched-namespace-1、watched-namespace-2、およびwatched-namespace-3にも、繰り返し同様の操作を実行します。oc create -f install/cluster-operator/020-RoleBinding-strimzi-cluster-operator.yaml -n <watched_namespace> oc create -f install/cluster-operator/031-RoleBinding-strimzi-cluster-operator-entity-operator-delegation.yaml -n <watched_namespace>
oc create -f install/cluster-operator/020-RoleBinding-strimzi-cluster-operator.yaml -n <watched_namespace> oc create -f install/cluster-operator/031-RoleBinding-strimzi-cluster-operator-entity-operator-delegation.yaml -n <watched_namespace>Copy to Clipboard Copied! Toggle word wrap Toggle overflow Cluster Operator をデプロイします。
oc create -f install/cluster-operator -n <my_cluster_operator_namespace>
oc create -f install/cluster-operator -n <my_cluster_operator_namespace>Copy to Clipboard Copied! Toggle word wrap Toggle overflow デプロイメントのステータスを確認します。
oc get deployments -n <my_cluster_operator_namespace>
oc get deployments -n <my_cluster_operator_namespace>Copy to Clipboard Copied! Toggle word wrap Toggle overflow デプロイメント名と準備状態が表示されている出力
NAME READY UP-TO-DATE AVAILABLE strimzi-cluster-operator 1/1 1 1
NAME READY UP-TO-DATE AVAILABLE strimzi-cluster-operator 1/1 1 1Copy to Clipboard Copied! Toggle word wrap Toggle overflow READYは、Ready/expected 状態のレプリカ数を表示します。AVAILABLE出力に1が表示されれば、デプロイメントは成功しています。
6.1.3.4. すべての namespace を対象とする Cluster Operator のデプロイメント リンクのコピーリンクがクリップボードにコピーされました!
この手順では、OpenShift クラスターのすべての namespace で AMQ Streams リソースを監視する Cluster Operator をデプロイする方法を説明します。
このモードで実行している場合、Cluster Operator は、新規作成された namespace でクラスターを自動的に管理します。
前提条件
-
この手順では、
CustomResourceDefinitions、ClusterRoles、およびClusterRoleBindingsを作成できる OpenShift ユーザーアカウントを使用する必要があります。通常、OpenShift クラスターでロールベースアクセス制御 (RBAC) を使用する場合、これらのリソースを作成、編集、および削除する権限を持つユーザーはsystem:adminなどの OpenShift クラスター管理者に限定されます。
手順
Cluster Operator のインストール先の namespace を使用するように、AMQ Streams インストールファイルを編集します。
たとえば、この手順では Cluster Operator は
<my_cluster_operator_namespace>という namespace にインストールされます。Linux の場合は、以下を使用します。
sed -i 's/namespace: .*/namespace: <my_cluster_operator_namespace>/' install/cluster-operator/*RoleBinding*.yaml
sed -i 's/namespace: .*/namespace: <my_cluster_operator_namespace>/' install/cluster-operator/*RoleBinding*.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow MacOS の場合は、以下を使用します。
sed -i '' 's/namespace: .*/namespace: <my_cluster_operator_namespace>/' install/cluster-operator/*RoleBinding*.yaml
sed -i '' 's/namespace: .*/namespace: <my_cluster_operator_namespace>/' install/cluster-operator/*RoleBinding*.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow install/cluster-operator/060-Deployment-strimzi-cluster-operator.yamlファイルを編集し、STRIMZI_NAMESPACE環境変数の値を*に設定します。Copy to Clipboard Copied! Toggle word wrap Toggle overflow クラスター全体ですべての namespace にアクセスできる権限を Cluster Operator に付与する
ClusterRoleBindingsを作成します。oc create clusterrolebinding strimzi-cluster-operator-namespaced --clusterrole=strimzi-cluster-operator-namespaced --serviceaccount <my_cluster_operator_namespace>:strimzi-cluster-operator oc create clusterrolebinding strimzi-cluster-operator-entity-operator-delegation --clusterrole=strimzi-entity-operator --serviceaccount <my_cluster_operator_namespace>:strimzi-cluster-operator
oc create clusterrolebinding strimzi-cluster-operator-namespaced --clusterrole=strimzi-cluster-operator-namespaced --serviceaccount <my_cluster_operator_namespace>:strimzi-cluster-operator oc create clusterrolebinding strimzi-cluster-operator-entity-operator-delegation --clusterrole=strimzi-entity-operator --serviceaccount <my_cluster_operator_namespace>:strimzi-cluster-operatorCopy to Clipboard Copied! Toggle word wrap Toggle overflow <my_cluster_operator_namespace>は、Cluster Operator をインストールする namespace に置き換えます。Cluster Operator を OpenShift クラスターにデプロイします。
oc create -f install/cluster-operator -n <my_cluster_operator_namespace>
oc create -f install/cluster-operator -n <my_cluster_operator_namespace>Copy to Clipboard Copied! Toggle word wrap Toggle overflow デプロイメントのステータスを確認します。
oc get deployments -n <my_cluster_operator_namespace>
oc get deployments -n <my_cluster_operator_namespace>Copy to Clipboard Copied! Toggle word wrap Toggle overflow デプロイメント名と準備状態が表示されている出力
NAME READY UP-TO-DATE AVAILABLE strimzi-cluster-operator 1/1 1 1
NAME READY UP-TO-DATE AVAILABLE strimzi-cluster-operator 1/1 1 1Copy to Clipboard Copied! Toggle word wrap Toggle overflow READYは、Ready/expected 状態のレプリカ数を表示します。AVAILABLE出力に1が表示されれば、デプロイメントは成功しています。
6.1.4. Kafka のデプロイ リンクのコピーリンクがクリップボードにコピーされました!
Apache Kafka は、耐障害性のリアルタイムデータフィードを実現する、オープンソースの分散型 publish/subscribe メッセージングシステムです。
本セクションの手順では、以下を説明します。
Cluster Operator を使用して以下をデプロイする方法
- 一時 または 永続 Kafka クラスター
Topic Operator および User Operator (
Kafkaカスタムリソースを設定してデプロイする)
Topic Operator および User Operator の代替のスタンドアロンデプロイメント手順
Kafka をインストールする場合、AMQ Streams によって ZooKeeper クラスターもインストールされ、Kafka と ZooKeeper との接続に必要な設定が追加されます。
6.1.4.1. Kafka クラスターのデプロイメント リンクのコピーリンクがクリップボードにコピーされました!
この手順では、Cluster Operator を使用して Kafka クラスターを OpenShift クラスターにデプロイする方法を説明します。
デプロイメントでは、YAML ファイルの仕様を使用して Kafka リソースが作成されます。
AMQ Streams には、設定ファイルのサンプル が含まれています。Kafka デプロイメントでは、以下の例が提供されます。
kafka-persistent.yaml- 3 つの Zookeeper ノードと 3 つの Kafka ノードを使用して永続クラスターをデプロイします。
kafka-jbod.yaml- それぞれが複数の永続ボリューを使用する、3 つの ZooKeeper ノードと 3 つの Kafka ノードを使用して、永続クラスターをデプロイします。
kafka-persistent-single.yaml- 1 つの ZooKeeper ノードと 1 つの Kafka ノードを使用して、永続クラスターをデプロイします。
kafka-ephemeral.yaml- 3 つの ZooKeeper ノードと 3 つの Kafka ノードを使用して、一時クラスターをデプロイします。
kafka-ephemeral-single.yaml- 3 つの ZooKeeper ノードと 1 つの Kafka ノードを使用して、一時クラスターをデプロイします。
この手順では、一時 および 永続 Kafka クラスターデプロイメントの例を使用します。
- 一時クラスター
-
通常、Kafka の一時クラスターは開発およびテスト環境での使用に適していますが、本番環境での使用には適していません。このデプロイメントでは、ブローカー情報 (ZooKeeper) と、トピックまたはパーティション (Kafka) を格納するための
emptyDirボリュームが使用されます。emptyDirボリュームを使用すると、その内容は Pod のライフサイクルと厳密な関係を持つため、Pod がダウンすると削除されます。 - 永続クラスター
Kafka の永続クラスターでは、永続ボリュームを使用して ZooKeeper および Kafka データを格納します。
PersistentVolumeClaimを使用してPersistentVolumeが取得され、PersistentVolumeの実際のタイプには依存しません。PersistentVolumeClaimでStorageClassを使用し、自動ボリュームプロビジョニングをトリガーすることができます。StorageClassが指定されていない場合、OpenShift はデフォルトのStorageClassを使用しようとします。次の例では、一般的なタイプの永続ボリュームを一部紹介しています。
- OpenShift クラスターが Amazon AWS で実行されている場合、OpenShift は Amazon EBS ボリュームをプロビジョニングできます。
- OpenShift クラスターが Microsoft Azure で実行されている場合、OpenShift は Azure Disk Storage ボリュームをプロビジョニングできます。
- OpenShift クラスターが Google Cloud で実行されている場合、OpenShift は永続ディスクボリュームをプロビジョニングできます
- OpenShift クラスターがベアメタルで実行されている場合、OpenShift はローカル永続ボリュームをプロビジョニングできます
このサンプル YAML ファイルは、最新のサポート対象 Kafka バージョン、サポート対象のログメッセージ形式バージョンとブローカー間のプロトコルバージョンの設定を指定します。Kafka config の inter.broker.protocol.version プロパティーは、指定された Kafka バージョン (spec.kafka.version) によってサポートされるバージョンである必要があります。このプロパティーは、Kafka クラスターで使用される Kafka プロトコルのバージョンを表します。
Kafka 3.0.0 以降、inter.broker.protocol.version が 3.0 以上に設定されていると、log.message.format.version オプションは無視されるため、設定する必要はありません。
Kafka のアップグレード 時に、inter.broker.protocol.version への更新が必要です。
サンプルクラスターの名前はデフォルトで my-cluster になります。クラスター名はリソースの名前によって定義され、クラスターがデプロイされた後に変更できません。クラスターをデプロイする前にクラスター名を変更するには、関連する YAML ファイルにある Kafka リソースの Kafka.metadata.name プロパティーを編集します。
デフォルトのクラスター名および指定された Kafka バージョン
手順
一時 または 永続 クラスターを作成およびデプロイします。
開発またはテストでは、一時クラスターの使用が適している可能性があります。永続クラスターはどのような状況でも使用することができます。
一時 クラスターを作成およびデプロイするには、以下を実行します。
oc apply -f examples/kafka/kafka-ephemeral.yaml
oc apply -f examples/kafka/kafka-ephemeral.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow 永続 クラスターを作成およびデプロイするには、以下を実行します。
oc apply -f examples/kafka/kafka-persistent.yaml
oc apply -f examples/kafka/kafka-persistent.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow
デプロイメントのステータスを確認します。
oc get pods -n <my_cluster_operator_namespace>
oc get pods -n <my_cluster_operator_namespace>Copy to Clipboard Copied! Toggle word wrap Toggle overflow Pod 名および readiness が表示される出力
Copy to Clipboard Copied! Toggle word wrap Toggle overflow my-clusterは Kafka クラスターの名前です。デフォルトのデプロイメントでは、Entity Operator クラスター、3 つの Kafka Pod、および 3 つの ZooKeeper Pod をインストールします。
READYは、Ready/expected 状態のレプリカ数を表示します。STATUSがRunningと表示されると、デプロイメントは成功しています。
6.1.4.2. Cluster Operator を使用した Topic Operator のデプロイ リンクのコピーリンクがクリップボードにコピーされました!
この手順では、Cluster Operator を使用して Topic Operator をデプロイする方法を説明します。
Kafka リソースの entityOperator プロパティーを設定し、topicOperator が含まれるようにします。デフォルトでは、Topic Operator は Cluster Operator によってデプロイされた Kafka クラスターの namespace で KafkaTopic リソースを監視します。Topic Operator spec で watchedNamespace を使用して namespace を指定することもできます。1 つの Topic Operator が監視できるのは、namespace 1 つです。1 つの namespace を監視するのは、Top Operator 1 つのみとします。
AMQ Streams を使用して複数の Kafka クラスターを同じ namespace にデプロイする場合は、1 つの Kafka クラスターに対してのみ Topic Operator を有効にするか、watchedNamespace プロパティーを使用して Topic Operators が他の namespace を監視するように設定します。
AMQ Streams によって管理されない Kafka クラスターを Topic Operator と使用する場合は、Topic Operator をスタンドアロンコンポーネントとしてデプロイ する必要があります。
entityOperator および topicOperator プロパティーの設定に関する詳細は、エンティティー Operator の設定 を参照してください。
手順
KafkaリソースのentityOperatorプロパティーを編集し、topicOperatorが含まれるようにします。Copy to Clipboard Copied! Toggle word wrap Toggle overflow EntityTopicOperatorSpecスキーマ参照に記載されているプロパティーを使用して、Topic Operator のspecを設定します。すべてのプロパティーにデフォルト値を使用する場合は、空のオブジェクト (
{}) を使用します。リソースを作成または更新します。
次のように
oc applyを使用します。oc apply -f <kafka_configuration_file>
oc apply -f <kafka_configuration_file>Copy to Clipboard Copied! Toggle word wrap Toggle overflow デプロイメントのステータスを確認します。
oc get pods -n <my_cluster_operator_namespace>
oc get pods -n <my_cluster_operator_namespace>Copy to Clipboard Copied! Toggle word wrap Toggle overflow Pod 名と準備状況が表示される出力
NAME READY STATUS RESTARTS my-cluster-entity-operator 3/3 Running 0 # ...
NAME READY STATUS RESTARTS my-cluster-entity-operator 3/3 Running 0 # ...Copy to Clipboard Copied! Toggle word wrap Toggle overflow my-clusterは Kafka クラスターの名前です。READYは、Ready/expected 状態のレプリカ数を表示します。STATUSがRunningと表示されると、デプロイメントは成功しています。
6.1.4.3. Cluster Operator を使用した User Operator のデプロイ リンクのコピーリンクがクリップボードにコピーされました!
この手順では、Cluster Operator を使用して User Operator をデプロイする方法を説明します。
Kafka リソースの entityOperator プロパティーを設定し、userOperator が含まれるようにします。デフォルトでは、User Operator は Kafka クラスターデプロイメントの namespace で KafkaUser リソースを監視します。User Operator spec で watchedNamespace を使用して namespace を指定することもできます。1 つの User Operator が監視できるのは、namespace 1 つです。1 つの namespace を監視するのは、User Operator 1 つのみとします。
AMQ Streams によって管理されない Kafka クラスターを User Operator と使用する場合は、User Operator をスタンドアロンコンポーネントとしてデプロイ する必要があります。
entityOperator および userOperator プロパティーの設定に関する詳細は、エンティティー Operator の設定 を参照してください。
手順
KafkaリソースのentityOperatorプロパティーを編集し、userOperatorが含まれるようにします。Copy to Clipboard Copied! Toggle word wrap Toggle overflow EntityUserOperatorSpecスキーマ参照 に記載されているプロパティーを使用して、User Operator のspecを設定します。すべてのプロパティーにデフォルト値を使用する場合は、空のオブジェクト (
{}) を使用します。リソースを作成または更新します。
oc apply -f <kafka_configuration_file>
oc apply -f <kafka_configuration_file>Copy to Clipboard Copied! Toggle word wrap Toggle overflow デプロイメントのステータスを確認します。
oc get pods -n <my_cluster_operator_namespace>
oc get pods -n <my_cluster_operator_namespace>Copy to Clipboard Copied! Toggle word wrap Toggle overflow Pod 名と準備状況が表示される出力
NAME READY STATUS RESTARTS my-cluster-entity-operator 3/3 Running 0 # ...
NAME READY STATUS RESTARTS my-cluster-entity-operator 3/3 Running 0 # ...Copy to Clipboard Copied! Toggle word wrap Toggle overflow my-clusterは Kafka クラスターの名前です。READYは、Ready/expected 状態のレプリカ数を表示します。STATUSがRunningと表示されると、デプロイメントは成功しています。
6.1.5. AMQ Streams Operator の代替のスタンドアロンデプロイメントオプション リンクのコピーリンクがクリップボードにコピーされました!
Topic Operator および User Operator のスタンドアロンデプロイメントを実行できます。Cluster Operator によって管理されない Kafka クラスターを使用している場合は、これらの Operator のスタンドアロンデプロイメントを検討してください。
Operator を OpenShift にデプロイします。Kafka は OpenShift 外で実行できます。たとえば、Kafka をマネージドサービスとして使用する場合があります。スタンドアロン Operator のデプロイメント設定を調整し、Kafka クラスターのアドレスと一致するようにします。
6.1.5.1. スタンドアロン Topic Operator のデプロイ リンクのコピーリンクがクリップボードにコピーされました!
この手順では、Topic Operator をトピック管理のスタンドアロンコンポーネントとしてデプロイする方法を説明します。スタンドアロン Topic Operator を Cluster Operator によって管理されない Kafka クラスターと使用できます。
スタンドアロンデプロイメントは、任意の Kafka クラスターで操作できます。
スタンドアロンデプロイメントファイルは AMQ Streams で提供されます。05-Deployment-strimzi-topic-operator.yaml デプロイメントファイルを使用して、Topic Operator をデプロイします。Kafka クラスターへの接続に必要な環境変数を追加または設定します。
Topic Operator は、単一の namespace で KafkaTopic リソースを監視します。Topic Operator 設定で、監視する namespace と Kafka クラスターへの接続を指定します。1 つの Topic Operator が監視できるのは、namespace 1 つです。1 つの namespace を監視するのは、Top Operator 1 つのみとします。複数の Topic Operator を使用する場合は、それぞれが異なる namespace を監視するように設定します。このようにして、Topic Operator を複数の Kafka クラスターで使用できます。
前提条件
Topic Operator の接続先となる Kafka クラスターを実行している。
スタンドアロンの Topic Operator が接続用に正しく設定されている限り、Kafka クラスターはベアメタル環境、仮想マシン、またはマネージドクラウドアプリケーションサービスで実行できます。
手順
install/topic-operator/05-Deployment-strimzi-topic-operator.yamlスタンドアロンデプロイメントファイルのenvプロパティーを編集します。スタンドアロンの Topic Operator デプロイメント設定の例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - 1
KafkaTopicリソースを監視する Topic Operator の OpenShift namespace。Kafka クラスターの namespace を指定します。- 2
- Kafka クラスターのすべてのブローカーを検出し、接続するブートストラップブローカーアドレスのホストとポートのペア。サーバーがダウンした場合に備えて、コンマ区切りリストを使用して 2 つまたは 3 つのブローカーアドレスを指定します。
- 3
- Topic Operator によって管理される
KafkaTopicリソースを識別するラベル。これは、Kafka クラスターの名前である必要はありません。KafkaTopicリソースに割り当てられたラベルにすることができます。複数の Topic Operator をデプロイする場合、ラベルはそれぞれに一意である必要があります。つまり、Operator は同じリソースを管理できません。 - 4
- ZooKeeper クラスターに接続するためのアドレスのホストおよびポートのペア。これは、Kafka クラスターが使用する ZooKeeper クラスターと同じである必要があります。
- 5
- ZooKeeper セッションのタイムアウト (秒単位)。デフォルトは
18000(18 秒) です。 - 6
- 定期的な調整の間隔 (秒単位)。デフォルトは
120000(2 分) です。 - 7
- Kafka からトピックメタデータの取得を試行する回数。各試行の間隔は、指数バックオフとして定義されます。パーティションまたはレプリカの数が原因で、トピックの作成に時間がかかる場合は、この値を大きくすることを検討してください。デフォルトの試行回数は
6回です。 - 8
- ロギングメッセージの出力レベル。レベルを、
ERROR、WARNING、INFO、DEBUG、またはTRACEに設定できます。 - 9
- Kafka ブローカーとの暗号化された通信の TLS サポートを有効にします。
- 10
- (任意) Topic Operator を実行する JVM に使用される Java オプション。
- 11
- (任意) Topic Operator に設定されたデバッグ (
-D) オプション。 - 12
- (オプション)TLS が
STRIMZI_TLS_ENABLEDによって有効になっている場合、トラストストア証明書の生成を省略します。この環境変数が有効になっている場合、ブローカーは TLS 証明書に公的に信頼できる認証局を使用する必要があります。デフォルトはfalseです。 - 13
- (オプション) 相互 TLS 認証用のキーストア証明書を生成します。これを
falseに設定すると、TLS を使用した Kafka ブローカーへのクライアント認証が無効になります。デフォルトはtrueです。 - 14
- (オプション)Kafka ブローカーに接続するときにクライアント認証の SASL サポートを有効にします。デフォルトは
falseです。 - 15
- (任意) クライアント認証用の SASL ユーザー名。SASL が
STRIMZI_SASL_ENABLEDによって有効化された場合のみ必須です。 - 16
- (任意) クライアント認証用の SASL パスワード。SASL が
STRIMZI_SASL_ENABLEDによって有効化された場合のみ必須です。 - 17
- (任意) クライアント認証用の SASL メカニズム。SASL が
STRIMZI_SASL_ENABLEDによって有効化された場合のみ必須です。この値はplain、scram-sha-256、またはscram-sha-512に設定できます。 - 18
- (任意)Kafka ブローカーとの通信に使用されるセキュリティープロトコル。デフォルト値は PLAINTEXT です。値は
PLAINTEXT、SSL、SASL_PLAINTEXT、またはSASL_SSLに設定できます。
-
公開認証局から証明書を使用している Kafka ブ ローカーに接続する場合は、
STRIMZI_PUBLIC_CAをtrueに設定します。たとえば、Amazon AWS MSK サービスを使用している場合は、このプロパティーをtrueに設定します。 STRIMZI_TLS_ENABLED環境変数で TLS を有効にした場合、Kafka クラスターへの接続を認証するために使用されるキーストアおよびトラストストアを指定します。TLS 設定の例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Topic Operator をデプロイします。
oc create -f install/topic-operator
oc create -f install/topic-operatorCopy to Clipboard Copied! Toggle word wrap Toggle overflow デプロイメントのステータスを確認します。
oc get deployments
oc get deploymentsCopy to Clipboard Copied! Toggle word wrap Toggle overflow デプロイメント名と準備状態が表示されている出力
NAME READY UP-TO-DATE AVAILABLE strimzi-topic-operator 1/1 1 1
NAME READY UP-TO-DATE AVAILABLE strimzi-topic-operator 1/1 1 1Copy to Clipboard Copied! Toggle word wrap Toggle overflow READYは、Ready/expected 状態のレプリカ数を表示します。AVAILABLE出力に1が表示されれば、デプロイメントは成功しています。
6.1.5.2. スタンドアロン User Operator のデプロイ リンクのコピーリンクがクリップボードにコピーされました!
この手順では、ユーザー管理のスタンドアロンコンポーネントとして User Operator をデプロイする方法を説明します。Cluster Operator の管理対象外となっている Kafka クラスターでは、スタンドアロンの User Operator を使用します。
スタンドアロンデプロイメントは、任意の Kafka クラスターで操作できます。
スタンドアロンデプロイメントファイルは AMQ Streams で提供されます。05-Deployment-strimzi-user-operator.yaml デプロイメントファイルを使用して、User Operator をデプロイします。Kafka クラスターへの接続に必要な環境変数を追加または設定します。
User Operator は、単一の namespace で KafkaUser リソースを監視します。User Operator 設定で、監視する namespace と Kafka クラスターへの接続を指定します。1 つの User Operator が監視できるのは、namespace 1 つです。1 つの namespace を監視するのは、User Operator 1 つのみとします。複数の User Operator を使用する場合は、それぞれが異なる namespace を監視するように設定します。このようにして、User Operator を複数の Kafka クラスターで使用できます。
前提条件
User Operator の接続先となる Kafka クラスターを実行している。
スタンドアロンの User Operator が接続用に正しく設定されている限り、Kafka クラスターはベアメタル環境、仮想マシン、またはマネージドクラウドアプリケーションサービスで実行できます。
手順
以下の
envプロパティーをinstall/user-operator/05-Deployment-strimzi-user-operator.yamlスタンドアロンデプロイメントファイルで編集します。スタンドアロン User Operator デプロイメント設定の例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - 1
KafkaUserリソースを監視する User Operator の OpenShift namespace。指定できる namespace は 1 つだけです。- 2
- Kafka クラスターのすべてのブローカーを検出し、接続するブートストラップブローカーアドレスのホストとポートのペア。サーバーがダウンした場合に備えて、コンマ区切りリストを使用して 2 つまたは 3 つのブローカーアドレスを指定します。
- 3
- TLS クライアント認証に対して新しいユーザー証明書に署名する認証局の公開鍵 (
ca.crt) の値が含まれる OpenShiftSecret。 - 4
- TLS クライアント認証に対して新しいユーザー証明書に署名する認証局の秘密鍵 (
ca.key) の値が含まれる OpenShiftSecret。 - 5
- User Operator によって管理される
KafkaUserリソースを識別するラベル。これは、Kafka クラスターの名前である必要はありません。KafkaUserリソースに割り当てられたラベルにすることができます。複数の User Operator をデプロイする場合、ラベルはそれぞれに一意である必要があります。つまり、Operator は同じリソースを管理できません。 - 6
- 定期的な調整の間隔 (秒単位)。デフォルトは
120000(2 分) です。 - 7
- ロギングメッセージの出力レベル。レベルを、
ERROR、WARNING、INFO、DEBUG、またはTRACEに設定できます。 - 8
- ガベッジコレクション (GC) ロギングを有効にします。デフォルトは
trueです。 - 9
- 認証局の有効期間。デフォルトは
365日です。 - 10
- 認証局の更新期間。更新期間は、現在の証明書の有効期日から逆算されます。デフォルトでは、古い証明書が期限切れになる前の証明書の更新期間は
30日です。 - 11
- (任意) User Operator を実行する JVM に使用される Java オプション。
- 12
- (任意) User Operator に設定されたデバッグ (
-D) オプション。 - 13
- (オプション)User Operator によって作成される OpenShift シークレットの名前の接頭辞。
- 14
- (任意)Kafka クラスターが Kafka Admin API を使用した承認 ACL ルールの管理をサポートするかどうかを示します。
falseに設定すると、User Operator はsimple承認 ACL ルールを持つすべてのリソースを拒否します。これは、Kafka クラスターログで不要な例外を回避するのに役立ちます。デフォルトはtrueです。 - 15
- (オプション) 期限切れのユーザー証明書が更新されるメンテナンス時間枠を定義する Cron 式のセミコロンで区切られたリスト。
TLS を使用して Kafka クラスターに接続する場合は、接続の認証に使用されるシークレットを指定します。それ以外の場合は、次のステップに進みます。
TLS 設定の例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow User Operator をデプロイします。
oc create -f install/user-operator
oc create -f install/user-operatorCopy to Clipboard Copied! Toggle word wrap Toggle overflow デプロイメントのステータスを確認します。
oc get deployments
oc get deploymentsCopy to Clipboard Copied! Toggle word wrap Toggle overflow デプロイメント名と準備状態が表示されている出力
NAME READY UP-TO-DATE AVAILABLE strimzi-user-operator 1/1 1 1
NAME READY UP-TO-DATE AVAILABLE strimzi-user-operator 1/1 1 1Copy to Clipboard Copied! Toggle word wrap Toggle overflow READYは、Ready/expected 状態のレプリカ数を表示します。AVAILABLE出力に1が表示されれば、デプロイメントは成功しています。
6.2. Kafka Connect のデプロイ リンクのコピーリンクがクリップボードにコピーされました!
Kafka Connect は、Apache Kafka と外部システムとの間でデータをストリーミングするためのツールです。
AMQ Streams では、Kafka Connect は分散 (distributed) モードでデプロイされます。Kafka Connect はスタンドアロンモードでも動作しますが、AMQ Streams ではサポートされません。
Kafka Connect は、コネクター の概念を使用し、スケーラビリティーと信頼性を維持しながら Kafka クラスターで大量のデータを移動するフレームワークを提供します。
Kafka Connect は通常、Kafka を外部データベース、ストレージシステム、およびメッセージングシステムと統合するために使用されます。
Cluster Operator は、KafkaConnect リソースを使用してデプロイされた Kafka Connect クラスターと、KafkaConnector リソースを使用して作成されたコネクターを管理します。
次の手順は、Kafka Connect をデプロイし、ストリーミングデータ用のコネクターを設定する方法を示しています。
コネクター という用語は、Kafka Connect クラスター内で実行されているコネクターインスタンスや、コネクタークラスと同じ意味で使用されます。本ガイドでは、本文の内容で意味が明確である場合に コネクター という用語を使用します。
6.2.1. Kafka Connect の OpenShift クラスターへのデプロイ リンクのコピーリンクがクリップボードにコピーされました!
この手順では、Cluster Operator を使用して Kafka Connect クラスターを OpenShift クラスターにデプロイする方法を説明します。
Kafka Connect クラスターは、設定可能なノード数 (このノードの別称: ワーカー) を指定して Deployment として実装されます。このノードは、メッセージフローのスケーラビリティーと信頼性が高くなるように、コネクターのワークロードを タスク として分散します。
デプロイメントでは、YAML ファイルの仕様を使用して KafkaConnect リソースが作成されます。
AMQ Streams には、設定ファイルのサンプル が用意されています。この手順では、以下のサンプルファイルを使用します。
-
examples/connect/kafka-connect.yaml
手順
Kafka Connect を OpenShift クラスターにデプロイします。
examples/connect/kafka-connect.yamlファイルを使用して Kafka Connect をデプロイします。oc apply -f examples/connect/kafka-connect.yaml
oc apply -f examples/connect/kafka-connect.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow デプロイメントのステータスを確認します。
oc get deployments -n <my_cluster_operator_namespace>
oc get deployments -n <my_cluster_operator_namespace>Copy to Clipboard Copied! Toggle word wrap Toggle overflow デプロイメント名と準備状態が表示されている出力
NAME READY UP-TO-DATE AVAILABLE my-connect-cluster-connect 1/1 1 1
NAME READY UP-TO-DATE AVAILABLE my-connect-cluster-connect 1/1 1 1Copy to Clipboard Copied! Toggle word wrap Toggle overflow my-connect-clusterは、Kafka Connect クラスターの名前です。READYは、Ready/expected 状態のレプリカ数を表示します。AVAILABLE出力に1が表示されれば、デプロイメントは成功しています。
6.2.2. 複数インスタンスの Kafka Connect 設定 リンクのコピーリンクがクリップボードにコピーされました!
Kafka Connect のインスタンスを複数実行している場合は、以下の config プロパティーのデフォルト設定を変更する必要があります。
group.id が同じすべての Kafka Connect インスタンスで、これら 3 つのトピックの値を揃える必要があります。
デフォルト設定を変更しない限り、同じ Kafka クラスターに接続する Kafka Connect インスタンスはそれぞれ同じ値でデプロイされます。事実上、すべてのインスタンスが結合されてクラスターで実行されて同じトピックを使用するようになります。
複数の Kafka Connect クラスターが同じトピックの使用を試みると、Kafka Connect は想定どおりに動作せず、エラーが生成されます。
複数の Kafka Connect インスタンスを実行する場合は、インスタンスごとにこれらのプロパティーの値を変更してください。
6.2.3. コネクタープラグインでの Kafka Connect の拡張 リンクのコピーリンクがクリップボードにコピーされました!
Kafka Connect は、コネクターインスタンスを使用して他のシステムと統合し、データをストリーミングします。コネクターは、次のいずれかのタイプにすることができます。
- データを Kafka にプッシュするソースコネクター
- Kafka からデータを抽出するシンクコネクター
このセクションの手順では、次のいずれかを実行してコネクターを追加する方法について説明します。
- 「AMQ Streams を使用した新しいコンテナーイメージの自動作成」
- 「Kafka Connect ベースイメージからの Docker イメージの作成」(手動または継続的インテグレーションを使用)
Kafka Connect REST API または KafkaConnector カスタムリソースを使用 して、直接コネクターの設定を作成します。
独自のコネクターを使用するか、サンプルの FileStreamSourceConnector および FileStreamSinkConnector コネクターを試して、ファイルベースのデータを Kafka クラスターに出し入れすることができます。サンプルファイルコネクターを KafkaConnector リソースとしてデプロイする方法は、「サンプル KafkaConnector リソースのデプロイ」 を参照してください。
Apache Kafka 3.1.0 までは、Kafka Connect の AMQ Streams コンテナーイメージにサンプルファイルコネクターが含まれていました。Apache Kafka 3.1.1 および 3.2.0 以降、これらのコネクターは含まれなくなり、他のコネクターと同様にデプロイする必要があります。
6.2.3.1. AMQ Streams を使用した新しいコンテナーイメージの自動作成 リンクのコピーリンクがクリップボードにコピーされました!
この手順では、AMQ Streams が追加のコネクターで新しいコンテナーイメージを自動的にビルドするように Kafka Connect を設定する方法を説明します。コネクタープラグインは、KafkaConnect カスタムリソースの .spec.build.plugins プロパティーを使用して定義します。AMQ Streams はコネクタープラグインを自動的にダウンロードし、新しいコンテナーイメージに追加します。コンテナーは、.spec.build.output に指定されたコンテナーリポジトリーにプッシュされ、Kafka Connect デプロイメントで自動的に使用されます。
前提条件
- Cluster Operator がデプロイされている。
- コンテナーレジストリー。
イメージをプッシュ、保存、およびプルできる独自のコンテナーレジストリーを提供する必要があります。AMQ Streams は、プライベートコンテナーレジストリーだけでなく、Quay や Docker Hub などのパブリックレジストリーもサポートします。
手順
.spec.build.outputでコンテナーレジストリーを、.spec.build.pluginsで追加のコネクターを指定して、KafkaConnectカスタムリソースを設定します。Copy to Clipboard Copied! Toggle word wrap Toggle overflow リソースを作成または更新します。
oc apply -f KAFKA-CONNECT-CONFIG-FILE
$ oc apply -f KAFKA-CONNECT-CONFIG-FILECopy to Clipboard Copied! Toggle word wrap Toggle overflow - 新しいコンテナーイメージがビルドされ、Kafka Connect クラスターがデプロイされるまで待ちます。
- Kafka Connect REST API または KafkaConnector カスタムリソースを使用して、追加したコネクタープラグインを使用します。
6.2.3.2. Kafka Connect ベースイメージからの Docker イメージの作成 リンクのコピーリンクがクリップボードにコピーされました!
この手順では、カスタムイメージを作成し、/opt/kafka/plugins ディレクトリーに追加する方法を説明します。
Red Hat Ecosystem Catalog の Kafka コンテナーイメージは、追加のコネクタープラグインで独自のカスタムイメージを作成するためのベースイメージとして使用できます。
AMQ Stream バージョンの Kafka Connect は起動時に、/opt/kafka/plugins ディレクトリーに含まれるサードパーティーのコネクタープラグインをロードします。
手順
registry.redhat.io/amq7/amq-streams-kafka-32-rhel8:2.2.2 をベースイメージとして使用して、新規のDockerfileを作成します。FROM registry.redhat.io/amq7/amq-streams-kafka-32-rhel8:2.2.2 USER root:root COPY ./my-plugins/ /opt/kafka/plugins/ USER 1001
FROM registry.redhat.io/amq7/amq-streams-kafka-32-rhel8:2.2.2 USER root:root COPY ./my-plugins/ /opt/kafka/plugins/ USER 1001Copy to Clipboard Copied! Toggle word wrap Toggle overflow プラグインファイルの例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 注記この例では、MongoDB、MySQL、および PostgreSQL 用の Debezium コネクターを使用します。Kafka Connect で実行されている Debezium は、他の Kafka Connect タスクと同じように表示されます。
- コンテナーイメージをビルドします。
- カスタムイメージをコンテナーレジストリーにプッシュします。
新しいコンテナーイメージを示します。
以下のいずれかを行います。
KafkaConnectカスタムリソースのKafkaConnect.spec.imageプロパティーを編集します。設定された場合、このプロパティーによって Cluster Operator の
STRIMZI_KAFKA_CONNECT_IMAGES変数がオーバーライドされます。Copy to Clipboard Copied! Toggle word wrap Toggle overflow または
-
install/cluster-operator/060-Deployment-strimzi-cluster-operator.yamlファイルのSTRIMZI_KAFKA_CONNECT_IMAGES変数を編集して新しいコンテナーイメージを参照するようにし、Cluster Operator を再インストールします。
6.2.4. コネクターの作成および管理 リンクのコピーリンクがクリップボードにコピーされました!
コネクタープラグインのコンテナーイメージを作成したら、Kafka Connect クラスターにコネクターインスタンスを作成する必要があります。その後、稼働中のコネクターインスタンスを設定、監視、および管理できます。
コネクターは特定の コネクタークラス のインスタンスで、関連のある外部システムとメッセージについて通信する方法を認識しています。コネクターは多くの外部システムで使用でき、独自のコネクターを作成することもできます。
ソース および シンク タイプのコネクターを作成できます。
- ソースコネクター
- ソースコネクターは、外部システムからデータを取得し、それをメッセージとして Kafka に提供するランタイムエンティティーです。
- シンクコネクター
- シンクコネクターは、Kafka トピックからメッセージを取得し、外部システムに提供するランタイムエンティティーです。
6.2.4.1. コネクターの作成および管理用の API リンクのコピーリンクがクリップボードにコピーされました!
AMQ Streams では、コネクターの作成および管理に 2 つの API が提供されます。
-
KafkaConnectorカスタムリソース (別称: KafkaConnectors) - Kafka Connect REST API
API を使用すると、以下を行うことができます。
- コネクターインスタンスのステータスの確認。
- 稼働中のコネクターの再設定。
- コネクターインスタンスのコネクタータスク数の増減。
- コネクターの再起動。
- 失敗したタスクを含むコネクタータスクの再起動。
- コネクターインスタンスの一時停止。
- 一時停止したコネクターインスタンスの再開。
- コネクターインスタンスの削除。
KafkaConnector カスタムリソース
KafkaConnectors を使用すると、Kafka Connect のコネクターインスタンスを OpenShift ネイティブに作成および管理できるため、cURL などの HTTP クライアントは必要ありません。他の Kafka リソースと同様に、KafkaConnector YAML ファイルで、コネクターの目的の状態を宣言して、このファイルを OpenShift クラスターにデプロイしてコネクターインスタンスを作成します。KafkaConnector リソースは、リンク先の Kafka Connect クラスターと同じ namespace にデプロイする必要があります。
該当する KafkaConnector リソースを更新して稼働中のコネクターインスタンスを管理した後、更新を適用します。該当する KafkaConnector を削除して、コネクターを削除します。
下位バージョンの AMQ Streams との互換性を維持するため、KafkaConnectors はデフォルトで無効になっています。Kafka Connect クラスターの KafkaConnectors を有効にするには、KafkaConnect リソースで strimzi.io/use-connector-resources アノテーションを true に設定します。手順は、Kafka Connect の設定 を参照してください。
KafkaConnectors が有効になると、Cluster Operator によって監視が開始されます。KafkaConnectors に定義された設定と一致するよう、稼働中のコネクターインスタンスの設定を更新します。
AMQ Streams には KafkaConnector 設定ファイルの例が含まれます。これをもとに、Kafka Connector のデプロイを使用して FileStreamSourceConnector と FileStreamSinkConnector を作成して管理できます。
KafkaConnector リソースにアノテーションを付けて、コネクターの再起動 または コネクタータスクの再起動 ができます。
Kafka Connect API
Kafka Connect REST API でサポートされる操作は、Apache Kafka Connect API のドキュメント で説明されています。
Kafka Connect API から KafkaConnectors の使用への切り替え
Kafka Connect API から KafkaConnectors の使用に切り替えると、コネクターを管理できます。スイッチの作成は、次の作業を以下の順序で行います。
-
設定で
KafkaConnectorリソースをデプロイし、コネクターインスタンスを作成します。 -
strimzi.io/use-connector-resourcesアノテーションをtrueに設定して、Kafka Connect 設定で KafkaConnectors を有効にします。
リソースを作成する前に KafkaConnectors を有効にすると、すべてのコネクターが削除されます。
KafkaConnectors から Kafka Connect API の使用に切り替えるには、まず Kafka Connect 設定から KafkaConnectors を有効にするアノテーションを削除します。それ以外の場合、Kafka Connect REST API を使用して直接行われた手動による変更は、 Cluster Operator によって元に戻されます。
6.2.4.2. サンプル KafkaConnector リソースのデプロイ リンクのコピーリンクがクリップボードにコピーされました!
KafkaConnector リソースは、Cluster Operator によるコネクターの管理に Kubernetes ネイティブのアプローチを提供します。AMQ Streams には、設定ファイルのサンプル が用意されています。この手順では、examples/connect/source-connector.yaml ファイルを使用して、次のコネクターインスタンスを KafkaConnector リソースとして作成します。
-
Kafka ライセンスファイル (ソース) から各行を読み取り、データをメッセージとして単一の Kafka トピックに書き込む
FileStreamSourceConnectorインスタンス。 -
Kafka トピックからメッセージを読み取り、メッセージを一時ファイル (シンク) に書き込む
FileStreamSinkConnectorインスタンス。
または、examples/connect/kafka-connect-build.yaml ファイルを使用して、ファイルコネクターを使用して新しい Kafka Connect イメージを構築することもできます。
Apache Kafka 3.1.0 までは、サンプルファイルコネクタープラグインが Apache Kafka に含まれていました。Apache Kafka の 3.1.1 および 3.2.0 リリースから、例を他のコネクターと同様にプラグインパスに追加する必要があります。詳細は、コネクタープラグインを使用した Kafka Connect の拡張 を参照してください。
「コネクタープラグインでの Kafka Connect の拡張」 で説明されているように、実稼働環境では、必要な Kafka Connect コネクターを使用してコンテナーイメージを準備します。
FileStreamSourceConnector および FileStreamSinkConnector が例として提供されています。ここで記載されているようなコンテナーで上記のコネクターを実行することは、実稼働のユースケースとして適切ではありません。
前提条件
- Kafka Connect デプロイメント。
- Kafka Connect デプロイメントで KafkaConnectors が有効である。
- Cluster Operator が稼働している。
手順
examples/connect/source-connector.yamlファイルを編集します。Copy to Clipboard Copied! Toggle word wrap Toggle overflow - 1
- コネクターの名前として使用される
KafkaConnectorリソースの名前。OpenShift リソースで有効な名前を使用します。 - 2
- コネクターインスタンスを作成する Kafka Connect クラスターの名前。コネクターは、リンク先の Kafka Connect クラスターと同じ namespace にデプロイする必要があります。
- 3
- コネクタークラスのフルネームまたはエイリアス。これは、Kafka Connect クラスターによって使用されているイメージに存在するはずです。
- 4
- コネクターが作成できる Kafka Connect
Tasksの最大数。 - 5
- キーと値のペアとしての コネクター設定。
- 6
- このサンプルソースコネクター設定では、
/opt/kafka/LICENSEファイルからデータが読み取られます。 - 7
- ソースデータのパブリッシュ先となる Kafka トピック。
OpenShift クラスターでソース
KafkaConnectorを作成します。oc apply -f examples/connect/source-connector.yaml
oc apply -f examples/connect/source-connector.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow examples/connect/sink-connector.yamlファイルを作成します。touch examples/connect/sink-connector.yaml
touch examples/connect/sink-connector.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow 以下の YAML を
sink-connector.yamlファイルに貼り付けます。Copy to Clipboard Copied! Toggle word wrap Toggle overflow OpenShift クラスターにシンク
KafkaConnectorを作成します。oc apply -f examples/connect/sink-connector.yaml
oc apply -f examples/connect/sink-connector.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow コネクターリソースが作成されたことを確認します。
oc get kctr --selector strimzi.io/cluster=MY-CONNECT-CLUSTER -o name my-source-connector my-sink-connector
oc get kctr --selector strimzi.io/cluster=MY-CONNECT-CLUSTER -o name my-source-connector my-sink-connectorCopy to Clipboard Copied! Toggle word wrap Toggle overflow MY-CONNECT-CLUSTER を Kafka Connect クラスターに置き換えます。
コンテナーで、
kafka -console-consumer.shを実行して、ソースコネクターによってトピックに書き込まれたメッセージを読み取ります。oc exec MY-CLUSTER-kafka-0 -i -t -- bin/kafka-console-consumer.sh --bootstrap-server MY-CLUSTER-kafka-bootstrap.NAMESPACE.svc:9092 --topic my-topic --from-beginning
oc exec MY-CLUSTER-kafka-0 -i -t -- bin/kafka-console-consumer.sh --bootstrap-server MY-CLUSTER-kafka-bootstrap.NAMESPACE.svc:9092 --topic my-topic --from-beginningCopy to Clipboard Copied! Toggle word wrap Toggle overflow
ソースおよびシンクコネクターの設定オプション
コネクター設定は、KafkaConnector リソースの spec.config プロパティーで定義されます。
FileStreamSourceConnector クラスおよび FileStreamSinkConnector クラスは、Kafka Connect REST API と同じ設定オプションをサポートします。他のコネクターは異なる設定オプションをサポートします。
| 名前 | タイプ | デフォルト値 | 説明 |
|---|---|---|---|
|
| 文字列 | Null | メッセージを書き込むソースファイル。指定のない場合は、標準入力が使用されます。 |
|
| List | Null | データのパブリッシュ先となる Kafka トピック。 |
| 名前 | タイプ | デフォルト値 | 説明 |
|---|---|---|---|
|
| 文字列 | Null | メッセージを書き込む宛先ファイル。指定のない場合は標準出力が使用されます。 |
|
| List | Null | データの読み取り元となる 1 つ以上の Kafka トピック。 |
|
| 文字列 | Null | データの読み取り元となる 1 つ以上の Kafka トピックと一致する正規表現。 |
6.2.4.3. Kafka コネクターの再起動の実行 リンクのコピーリンクがクリップボードにコピーされました!
この手順では、OpenShift アノテーションを使用して Kafka コネクターの再起動を手動でトリガーする方法を説明します。
前提条件
- Cluster Operator が稼働中である。
手順
再起動する Kafka コネクターを制御する
KafkaConnectorカスタムリソースの名前を見つけます。oc get KafkaConnector
oc get KafkaConnectorCopy to Clipboard Copied! Toggle word wrap Toggle overflow コネクターを再起動するには、OpenShift で
KafkaConnectorリソースにアノテーションを付けます。たとえば、oc annotateを使用すると以下のようになります。oc annotate KafkaConnector KAFKACONNECTOR-NAME strimzi.io/restart=true
oc annotate KafkaConnector KAFKACONNECTOR-NAME strimzi.io/restart=trueCopy to Clipboard Copied! Toggle word wrap Toggle overflow 次の調整が発生するまで待ちます (デフォルトでは 2 分ごとです)。
アノテーションが調整プロセスで検出されれば、Kafka コネクターは再起動されます。Kafka Connect が再起動リクエストを受け入れると、アノテーションは
KafkaConnectorカスタムリソースから削除されます。
6.2.4.4. Kafka コネクタータスクの再起動の実行 リンクのコピーリンクがクリップボードにコピーされました!
この手順では、OpenShift アノテーションを使用して Kafka コネクタータスクの再起動を手動でトリガーする方法を説明します。
前提条件
- Cluster Operator が稼働中である。
手順
再起動する Kafka コネクタータスクを制御する
KafkaConnectorカスタムリソースの名前を見つけます。oc get KafkaConnector
oc get KafkaConnectorCopy to Clipboard Copied! Toggle word wrap Toggle overflow KafkaConnectorカスタムリソースから再起動するタスクの ID を検索します。タスク ID は 0 から始まる負の値ではない整数です。oc describe KafkaConnector KAFKACONNECTOR-NAME
oc describe KafkaConnector KAFKACONNECTOR-NAMECopy to Clipboard Copied! Toggle word wrap Toggle overflow コネクタータスクを再起動するには、OpenShift で
KafkaConnectorリソースにアノテーションを付けます。たとえば、oc annotateを使用してタスク 0 を再起動します。oc annotate KafkaConnector KAFKACONNECTOR-NAME strimzi.io/restart-task=0
oc annotate KafkaConnector KAFKACONNECTOR-NAME strimzi.io/restart-task=0Copy to Clipboard Copied! Toggle word wrap Toggle overflow 次の調整が発生するまで待ちます (デフォルトでは 2 分ごとです)。
アノテーションが調整プロセスで検出されれば、Kafka コネクタータスクは再起動されます。Kafka Connect が再起動リクエストを受け入れると、アノテーションは
KafkaConnectorカスタムリソースから削除されます。
6.2.4.5. Kafka Connect API の公開 リンクのコピーリンクがクリップボードにコピーされました!
KafkaConnector リソースを使用してコネクターを管理する代わりに、Kafka Connect REST API を使用します。Kafka Connect REST API は、<connect_cluster_name>-connect-api:8083 で実行しているサービスとして利用できます。ここで、<connect_cluster_name> は、お使いの Kafka Connect クラスターの名前になります。サービスは、Kafka Connect インスタンスの作成時に作成されます。
strimzi.io/use-connector-resources アノテーションは KafkaConnectors を有効にします。アノテーションを KafkaConnect リソース設定に適用した場合、そのアノテーションを削除して Kafka Connect API を使用する必要があります。それ以外の場合、Kafka Connect REST API を使用して直接行われた手動による変更は、 Cluster Operator によって元に戻されます。
コネクター設定を JSON オブジェクトとして追加できます。
コネクター設定を追加するための curl 要求の例
API には OpenShift クラスター内でのみアクセスできます。OpenShift クラスター外部で実行しているアプリケーションに Kafka Connect API がアクセスできるようにする場合は、以下の機能のいずれかを使用して Kafka Connect API を手動で公開できます。
-
LoadBalancerまたはNodePortタイプのサービス -
Ingressリソース - OpenShift ルート
接続は安全ではないため、外部からのアクセスはよく考えてから許可してください。
サービスを作成する場合には、<connect_cluster_name>-connect-api サービスの selector からラベルを使用して、サービスがトラフィックをルーティングする Pod を設定します。
サービスのセレクター設定
また、外部クライアントからの HTTP 要求を許可する NetworkPolicy を作成する必要もあります。
Kafka Connect API への要求を許可する NetworkPolicy の例
- 1
- API への接続が許可される Pod のラベル。
クラスター外でコネクター設定を追加するには、curl コマンドで API を公開するリソースの URL を使用します。
6.3. Kafka MirrorMaker のデプロイ リンクのコピーリンクがクリップボードにコピーされました!
Cluster Operator によって、1 つ以上の Kafka MirrorMaker のレプリカがデプロイされ、Kafka クラスターの間でデータが複製されます。このプロセスは、Kafka パーティションのレプリケーションの概念と混同しないように、ミラーリングと呼ばれます。MirrorMaker は、ソースクラスターからメッセージを消費し、これらのメッセージをターゲットクラスターにパブリッシュします。
6.3.1. Kafka MirrorMaker の OpenShift クラスターへのデプロイ リンクのコピーリンクがクリップボードにコピーされました!
この手順では、Cluster Operator を使用して Kafka MirrorMaker クラスターを OpenShift クラスターにデプロイする方法を説明します。
デプロイメントでは、YAML ファイルの仕様を使用して、デプロイされた MirrorMaker のバージョンに応じて KafkaMirrorMaker または KafkaMirrorMaker2 リソースが作成されます。
Kafka MirrorMaker 1 (ドキュメントでは単に MirrorMaker と呼ばれる) は Apache Kafka 3.0.0 で非推奨となり、Apache Kafka 4.0.0 で削除されます。そのため、Kafka MirrorMaker 1 のデプロイに使用される KafkaMirrorMaker カスタムリソースも、AMQ Streams で非推奨となりました。Apache Kafka 4.0.0 を導入すると、KafkaMirrorMaker リソースは AMQ Streams から削除されます。代わりに、IdentityReplicationPolicy で KafkaMirrorMaker2 カスタムリソースを使用します。
AMQ Streams には、設定ファイルのサンプル が用意されています。この手順では、以下のサンプルファイルを使用します。
-
examples/mirror-maker/kafka-mirror-maker.yaml -
examples/mirror-maker/kafka-mirror-maker-2.yaml
手順
Kafka MirrorMaker を OpenShift クラスターにデプロイします。
MirrorMaker の場合
oc apply -f examples/mirror-maker/kafka-mirror-maker.yaml
oc apply -f examples/mirror-maker/kafka-mirror-maker.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow MirrorMaker 2.0 の場合
oc apply -f examples/mirror-maker/kafka-mirror-maker-2.yaml
oc apply -f examples/mirror-maker/kafka-mirror-maker-2.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow デプロイメントのステータスを確認します。
oc get deployments -n <my_cluster_operator_namespace>
oc get deployments -n <my_cluster_operator_namespace>Copy to Clipboard Copied! Toggle word wrap Toggle overflow デプロイメント名と準備状態が表示されている出力
NAME READY UP-TO-DATE AVAILABLE my-mirror-maker-mirror-maker 1/1 1 1 my-mm2-cluster-mirrormaker2 1/1 1 1
NAME READY UP-TO-DATE AVAILABLE my-mirror-maker-mirror-maker 1/1 1 1 my-mm2-cluster-mirrormaker2 1/1 1 1Copy to Clipboard Copied! Toggle word wrap Toggle overflow my-mirror-makerは、Kafka MirrorMaker クラスターの名前です。my-mm2-clusterは Kafka MirrorMaker 2.0 クラスターの名前です。READYは、Ready/expected 状態のレプリカ数を表示します。AVAILABLE出力に1が表示されれば、デプロイメントは成功しています。
6.4. Kafka Bridge のデプロイ リンクのコピーリンクがクリップボードにコピーされました!
Cluster Operator によって、1 つ以上の Kafka Bridge のレプリカがデプロイされ、HTTP API 経由で Kafka クラスターとクライアントの間でデータが送信されます。
6.4.1. Kafka Bridge を OpenShift クラスターへデプロイ リンクのコピーリンクがクリップボードにコピーされました!
この手順では、Cluster Operator を使用して Kafka Bridge クラスターを OpenShift クラスターにデプロイする方法を説明します。
デプロイメントでは、YAML ファイルの仕様を使用して KafkaBridge リソースが作成されます。
AMQ Streams には、設定ファイルのサンプル が用意されています。この手順では、以下のサンプルファイルを使用します。
-
examples/bridge/kafka-bridge.yaml
手順
Kafka Bridge を OpenShift クラスターにデプロイします。
oc apply -f examples/bridge/kafka-bridge.yaml
oc apply -f examples/bridge/kafka-bridge.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow デプロイメントのステータスを確認します。
oc get deployments -n <my_cluster_operator_namespace>
oc get deployments -n <my_cluster_operator_namespace>Copy to Clipboard Copied! Toggle word wrap Toggle overflow デプロイメント名と準備状態が表示されている出力
NAME READY UP-TO-DATE AVAILABLE my-bridge-bridge 1/1 1 1
NAME READY UP-TO-DATE AVAILABLE my-bridge-bridge 1/1 1 1Copy to Clipboard Copied! Toggle word wrap Toggle overflow my-bridgeは、Kafka Bridge クラスターの名前です。READYは、Ready/expected 状態のレプリカ数を表示します。AVAILABLE出力に1が表示されれば、デプロイメントは成功しています。
6.4.2. Kafka Bridge サービスのローカルマシンへの公開 リンクのコピーリンクがクリップボードにコピーされました!
ポート転送を使用して AMQ Streams の Kafka Bridge サービスを http://localhost:8080 上でローカルマシンに公開します。
ポート転送は、開発およびテストの目的でのみ適切です。
手順
OpenShift クラスターの Pod の名前をリストします。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow ポート
8080で Kafka Bridge Pod に接続します。oc port-forward pod/quickstart-bridge-589d78784d-9jcnr 8080:8080 &
oc port-forward pod/quickstart-bridge-589d78784d-9jcnr 8080:8080 &Copy to Clipboard Copied! Toggle word wrap Toggle overflow 注記ローカルマシンのポート 8080 がすでに使用中の場合は、代わりの HTTP ポート (
8008など) を使用します。
これで、API リクエストがローカルマシンのポート 8080 から Kafka Bridge Pod のポート 8080 に転送されるようになります。
6.4.3. OpenShift 外部の Kafka Bridge へのアクセス リンクのコピーリンクがクリップボードにコピーされました!
デプロイメント後、AMQ Streams Kafka Bridge には同じ OpenShift クラスターで実行しているアプリケーションのみがアクセスできます。これらのアプリケーションは、<kafka_bridge_name>-bridge-service サービスを使用して API にアクセスします。
OpenShift クラスター外部で実行しているアプリケーションに Kafka Bridge がアクセスできるようにする場合は、以下の機能のいずれかを作成して Kafka Bridge を手動で公開できます。
-
LoadBalancerまたはNodePortタイプのサービス -
Ingressリソース - OpenShift ルート
サービスを作成する場合には、<kafka_bridge_name>-bridge-service サービスの selector からラベルを使用して、サービスがトラフィックをルーティングする Pod を設定します。
# ...
selector:
strimzi.io/cluster: kafka-bridge-name
strimzi.io/kind: KafkaBridge
#...
# ...
selector:
strimzi.io/cluster: kafka-bridge-name
strimzi.io/kind: KafkaBridge
#...
- 1
- OpenShift クラスターでの Kafka Bridge カスタムリソースの名前。
第7章 Kafka クラスターへのクライアントアクセスの設定 リンクのコピーリンクがクリップボードにコピーされました!
AMQ Streams のデプロイ 後、本章では以下の操作を行う方法について説明します。
- サンプルプロデューサーおよびコンシューマークライアントをデプロイし、これを使用してデプロイメントを検証する
Kafka クラスターへの外部クライアントアクセスを設定する
OpenShift 外部のクライアントに Kafka クラスターへのアクセスを設定する手順はより複雑で、Kafka コンポーネントの設定手順 に精通している必要があります。
7.1. サンプルクライアントのデプロイ リンクのコピーリンクがクリップボードにコピーされました!
この手順では、ユーザーが作成した Kafka クラスターを使用してメッセージを送受信するプロデューサーおよびコンシューマークライアントの例をデプロイする方法を説明します。
前提条件
- クライアントが Kafka クラスターを使用できる。
手順
Kafka プロデューサーをデプロイします。
oc run kafka-producer -ti --image=registry.redhat.io/amq7/amq-streams-kafka-32-rhel8:2.2.2 --rm=true --restart=Never -- bin/kafka-console-producer.sh --bootstrap-server cluster-name-kafka-bootstrap:9092 --topic my-topic
oc run kafka-producer -ti --image=registry.redhat.io/amq7/amq-streams-kafka-32-rhel8:2.2.2 --rm=true --restart=Never -- bin/kafka-console-producer.sh --bootstrap-server cluster-name-kafka-bootstrap:9092 --topic my-topicCopy to Clipboard Copied! Toggle word wrap Toggle overflow - プロデューサーが稼働しているコンソールにメッセージを入力します。
- Enter を押してメッセージを送信します。
Kafka コンシューマーをデプロイします。
oc run kafka-consumer -ti --image=registry.redhat.io/amq7/amq-streams-kafka-32-rhel8:2.2.2 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server cluster-name-kafka-bootstrap:9092 --topic my-topic --from-beginning
oc run kafka-consumer -ti --image=registry.redhat.io/amq7/amq-streams-kafka-32-rhel8:2.2.2 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server cluster-name-kafka-bootstrap:9092 --topic my-topic --from-beginningCopy to Clipboard Copied! Toggle word wrap Toggle overflow - コンシューマーコンソールに受信メッセージが表示されることを確認します。
7.2. OpenShift 外クライアントのアクセスの設定 リンクのコピーリンクがクリップボードにコピーされました!
以下の手順では、OpenShift 外部からの Kafka クラスターへのクライアントアクセスを設定する方法を説明します。
Kafka クラスターのアドレスを使用して、異なる OpenShift namespace または完全に OpenShift 外のクライアントに外部アクセスを提供できます。
アクセスを提供するために、外部 Kafka リスナーを設定します。
以下のタイプの外部リスナーがサポートされます。
-
OpenShift
Routeおよびデフォルトの HAProxy ルーターを使用するroute -
ロードバランサーサービスを使用する
loadbalancer -
OpenShift ノードのポートを使用する
nodeport -
OpenShift Ingress と NGINX Ingress Controller for Kubernetes を使用する
ingress
要件ならびにお使いの環境およびインフラストラクチャーに応じて、選択するタイプは異なります。たとえば、ロードバランサーは、ベアメタルなどの特定のインフラストラクチャーには適さない場合があります。ベアメタルでは、ノードポートがより適したオプションを提供します。
以下の手順では、
- TLS 暗号化および認証、ならびに Kafka 簡易承認 を有効にして、Kafka クラスターに外部リスナーが設定されます。
-
簡易承認 用に TLS 認証および アクセス制御リスト (ACL) を定義して、クライアントに
KafkaUserが作成されます。
TLS、SCRAM-SHA-512、または OAuth 2.0 認証を使用するようにリスナーを設定できます。TLS は常に暗号化を使用しますが、SCRAM-SHA-512 および OAuth 2.0 認証でも暗号化を使用することが推奨されます。
Kafka ブローカーにシンプルな、OAuth 2.0、OPA、またはカスタム承認を設定できます。承認を有効にすると、承認は有効なすべてのリスナーに適用されます。
KafkaUser 認証および承認メカニズムを設定する場合は、必ず同等の Kafka 設定と一致させてください。
-
KafkaUser.spec.authenticationはKafka.spec.kafka.listeners[*].authenticationと一致します。 -
KafkaUser.spec.authorizationはKafka.spec.kafka.authorizationと一致します。
KafkaUser に使用する認証をサポートするリスナーが少なくとも 1 つ必要です。
Kafka ユーザーと Kafka ブローカー間の認証は、それぞれの認証設定によって異なります。たとえば、TLS が Kafka 設定で有効になっていない場合は、TLS でユーザーを認証できません。
AMQ Streams Operator により設定プロセスが自動されます。
- Cluster Operator はリスナーを作成し、クラスターおよびクライアント認証局 (CA) 証明書を設定して Kafka クラスター内で認証を有効にします。
- User Operator はクライアントに対応するユーザーを作成すると共に、選択した認証タイプに基づいて、クライアント認証に使用されるセキュリティークレデンシャルを作成します。
この手順では、Cluster Operator によって生成された証明書が使用されますが、独自の証明書をインストール してそれらを置き換えることができます。外部認証局によって管理される Kafka リスナー証明書を使用するようにリスナーを設定 することもできます。
PKCS #12 (.p12) 形式および PEM (.crt) 形式の証明書を利用できます。この手順では、PKCS #12 証明書を説明します。
前提条件
- クライアントが Kafka クラスターを使用できる。
- Cluster Operator および User Operator がクラスターで実行されている必要があります。
- OpenShift クラスター外のクライアントが Kafka クラスターに接続できる。
手順
externalKafka リスナーと共に Kafka クラスターを設定します。- リスナーを通じて Kafka ブローカーにアクセスするのに必要な認証を定義します。
Kafka ブローカーで承認を有効にします。
以下に例を示します。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - 1
- 外部リスナーを有効にする設定オプションは、汎用 Kafka リスナースキーマ参照 に記載されています。
- 2
- リスナーを識別するための名前。Kafka クラスター内で一意である必要があります。
- 3
- Kafka 内でリスナーによって使用されるポート番号。ポート番号は指定の Kafka クラスター内で一意である必要があります。許可されるポート番号は 9092 以上ですが、すでに Prometheus および JMX によって使用されているポート 9404 および 9999 以外になります。リスナーのタイプによっては、ポート番号は Kafka クライアントに接続するポート番号と同じではない場合があります。
- 4
route、loadbalancer、nodeport、またはingressとして指定される外部リスナータイプ。内部リスナーはinternalとして指定されます。- 5
- リスナーで TLS による暗号化を有効にします。デフォルトは
falseです。routeリスナーには TLS 暗号化は必要ありません。 - 6
- 認証は
tlsとして指定されます。 - 7
- (任意設定:
nodeportリスナーのみ) ノードアドレスとして AMQ Streams により使用される最初のアドレスタイプの希望を指定します。 - 8
- (任意設定) AMQ Streams はクライアントに公開するアドレスを自動的に決定します。アドレスは OpenShift によって自動的に割り当てられます。AMQ Streams を実行しているインフラストラクチャーが正しいアドレスを提供しない場合、ブートストラップおよびブローカーサービス アドレスを上書きできます。検証はオーバーライドに対しては実行されません。オーバーライド設定はリスナーのタイプによって異なります。たとえば、
routeの場合はホストを、loadbalancerの場合は DNS 名または IP アドレスを、またnodeportの場合はノードポートを、それぞれ上書きすることができます。 - 9
simpleと指定された承認 (AclAuthorizerKafka プラグインを使用する)。- 10
- (任意設定) スーパーユーザーは、ACL で定義されたアクセス制限に関係なく、すべてのブローカーにアクセスできます。
警告OpenShift Route アドレスは、Kafka クラスターの名前、リスナーの名前、および作成される namespace の名前で設定されます。たとえば、
my-cluster-kafka-listener1-bootstrap-myproject(CLUSTER-NAME-kafka-LISTENER-NAME-bootstrap-NAMESPACE) となります。routeリスナータイプを使用している場合、アドレス全体の長さが上限の 63 文字を超えないように注意してください。
Kafkaリソースを作成または更新します。oc apply -f <kafka_configuration_file>
oc apply -f <kafka_configuration_file>Copy to Clipboard Copied! Toggle word wrap Toggle overflow Kafka クラスターは、TLS 認証を使用する Kafka ブローカーリスナーと共に設定されます。
Kafka ブローカー Pod ごとにサービスが作成されます。
サービスが作成され、Kafka クラスターに接続するための ブートストラップアドレス として機能します。
サービスは、
nodeportリスナーを使用した Kafka クラスターへの外部接続用 外部ブートストラップアドレス としても作成されます。kafka ブローカーのアイデンティティーを検証するクラスター CA 証明書もシークレット
<cluster_name>-cluster-ca-certに作成されます。注記外部リスナーの使用時に Kafka クラスターをスケーリングする場合、すべての Kafka ブローカーのローリング更新がトリガーされる可能性があります。これは設定によって異なります。
Kafkaリソースのステータスからブートストラップアドレスおよびポートを見つけます。oc get kafka KAFKA-CLUSTER-NAME -o jsonpath='{.status.listeners[?(@.name=="external")].bootstrapServers}'oc get kafka KAFKA-CLUSTER-NAME -o jsonpath='{.status.listeners[?(@.name=="external")].bootstrapServers}'Copy to Clipboard Copied! Toggle word wrap Toggle overflow Kafka クライアントのブートストラップアドレスを使用して、Kafka クラスターに接続します。
Kafka クラスターにアクセスする必要があるクライアントに対応するユーザーを作成または変更します。
KafkaUserリソースを作成または変更します。oc apply -f USER-CONFIG-FILE
oc apply -f USER-CONFIG-FILECopy to Clipboard Copied! Toggle word wrap Toggle overflow KafkaUserリソースと同じ名前の Secret と共に、ユーザーが作成されます。Secret には、TLS クライアント認証の秘密鍵と公開鍵が含まれます。以下に例を示します。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow パブリッククラスター CA 証明書を必要な証明書形式にデプロイメントします。
oc get secret KAFKA-CLUSTER-NAME-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12oc get secret KAFKA-CLUSTER-NAME-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12Copy to Clipboard Copied! Toggle word wrap Toggle overflow パスワードファイルからパスワードを抽出します。
oc get secret KAFKA-CLUSTER-NAME-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d > ca.passwordoc get secret KAFKA-CLUSTER-NAME-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d > ca.passwordCopy to Clipboard Copied! Toggle word wrap Toggle overflow パブリッククラスター証明書の認証情報でクライアントを設定します。
クライアントコードのサンプル
properties.put("security.protocol","SSL"); properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,"/path/to/ca.p12"); properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,CA-PASSWORD); properties.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,"PKCS12");properties.put("security.protocol","SSL");1 properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,"/path/to/ca.p12");2 properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,CA-PASSWORD);3 properties.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,"PKCS12");4 Copy to Clipboard Copied! Toggle word wrap Toggle overflow 注記TLS 経由で SCRAM-SHA 認証を使用する場合は、
security.protocol: SASL_SSLを使用します。ユーザー Secret から必要な証明書形式にユーザー CA 証明書を抽出します。
oc get secret USER-NAME -o jsonpath='{.data.user\.p12}' | base64 -d > user.p12oc get secret USER-NAME -o jsonpath='{.data.user\.p12}' | base64 -d > user.p12Copy to Clipboard Copied! Toggle word wrap Toggle overflow パスワードファイルからパスワードを抽出します。
oc get secret USER-NAME -o jsonpath='{.data.user\.password}' | base64 -d > user.passwordoc get secret USER-NAME -o jsonpath='{.data.user\.password}' | base64 -d > user.passwordCopy to Clipboard Copied! Toggle word wrap Toggle overflow ユーザー CA 証明書の認証情報でクライアントを設定します。
クライアントコードのサンプル
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,"/path/to/user.p12"); properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,"<user.password>"); properties.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG,"PKCS12");
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,"/path/to/user.p12");1 properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,"<user.password>");2 properties.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG,"PKCS12");3 Copy to Clipboard Copied! Toggle word wrap Toggle overflow Kafka クラスターに接続するためのブートストラップアドレスおよびポートを追加します。
bootstrap.servers: BOOTSTRAP-ADDRESS:PORT
bootstrap.servers: BOOTSTRAP-ADDRESS:PORTCopy to Clipboard Copied! Toggle word wrap Toggle overflow
第8章 AMQ Streams のメトリクスおよびダッシュボードの設定 リンクのコピーリンクがクリップボードにコピーされました!
Prometheus および Grafana を使用して、AMQ Streams デプロイメントを監視できます。
ダッシュボードでキーメトリクスを表示し、特定の条件下でトリガーされるアラートを設定すると、AMQ Streams デプロイメントを監視できます。メトリックは、AMQ Streams の各コンポーネントで利用できます。
AMQ Streams は、メトリクス情報を提供するために、Prometheus ルールと Grafana ダッシュボードを使用します。
Prometheus に AMQ Streams の各コンポーネントのルールセットが設定されている場合、Prometheus はクラスターで稼働している Pod からキーメトリックを使用します。次に、Grafana はこれらのメトリックをダッシュボードで可視化します。AMQ Streams には、デプロイメントに合わせてカスタマイズできる Grafana ダッシュボードのサンプルが含まれています。
AMQ Streams は、ユーザー定義プロジェクトのモニタリング (OpenShift 機能) を使用して、Prometheus の設定プロセスを単純化します。
要件に応じて以下を行うことができます。
Prometheus および Grafana が設定されると、監視に AMQ Streams が提供する Grafana ダッシュボードのサンプルを使用できます。
さらに、分散トレーシングを設定 して、エンドツーエンドのメッセージ追跡を行うようにデプロイメントを設定することもできます。
AMQ Streams は、Prometheus と Grafana のインストールファイルの例を提供します。AMQ Streams の監視を試みる際に、このファイルを開始点として使用できます。さらにサポートするには、Prometheus および Grafana 開発者コミュニティーに参加してみてください。
メトリクスおよびモニタリングツールのサポートドキュメント
メトリクスおよびモニタリングツールの詳細は、サポートドキュメントを参照してください。
- Prometheus
- Prometheus の設定
- Kafka Exporter
- Grafana Labs
- Apache Kafka Monitoring では、Apache Kafka により公開される JMX メトリクスについて解説しています。
- ZooKeeper JMX では、Apache Zookeeper により公開される JMX メトリックについて解説しています。
8.1. Kafka Exporter でのコンシューマーラグの監視 リンクのコピーリンクがクリップボードにコピーされました!
Kafka Exporter は、Apache Kafka ブローカーおよびクライアントの監視を強化するオープンソースプロジェクトです。Kafka クラスターで Kafka Exporter をデプロイ するように、Kafka リソースを設定できます。Kafka Exporter は、オフセット、コンシューマーグループ、コンシューマーラグ、およびトピックに関連する Kafka ブローカーから追加のメトリックデータを抽出します。一例として、メトリクスデータを使用すると、低速なコンシューマーの識別に役立ちます。ラグデータは Prometheus メトリクスとして公開され、解析のために Grafana で使用できます。
Kafka Exporter は、コンシューマーラグおよびコンシューマーオフセットに関連する追加のメトリックのみを提供します。通常の Kafka メトリクスでは、Kafka ブローカー で、Prometheus メトリクスを設定する必要があります。
コンシューマーラグは、メッセージの生成と消費の差を示しています。具体的には、指定のコンシューマーグループのコンシューマーラグは、パーティションの最後のメッセージと、そのコンシューマーが現在ピックアップしているメッセージとの時間差を示しています。
ラグには、パーティションログの最後を基準とする、コンシューマーオフセットの相対的な位置が反映されます。
プロデューサーおよびコンシューマーオフセット間のコンシューマーラグ
この差は、Kafka ブローカートピックパーティションの読み取りと書き込みの場所である、プロデューサーオフセットとコンシューマーオフセットの間の デルタ とも呼ばれます。
あるトピックで毎秒 100 個のメッセージがストリーミングされる場合を考えてみましょう。プロデューサーオフセット (トピックパーティションの先頭) と、コンシューマーが読み取った最後のオフセットとの間のラグが 1000 個のメッセージであれば、10 秒の遅延があることを意味します。
コンシューマーラグ監視の重要性
可能な限りリアルタイムのデータの処理に依存するアプリケーションでは、コンシューマーラグを監視して、ラグが過度に大きくならないようにチェックする必要があります。ラグが大きくなるほど、プロセスはリアルタイム処理の目的から遠ざかります。
たとえば、コンシューマーラグは、パージされていない古いデータを大量に消費したり、計画外のシャットダウンが原因である可能性があります。
コンシューマーラグの削減
Grafana のチャートを使用して、ラグを分析し、ラグ削減の方法が対象のコンシューマーグループに影響しているかどうかを確認します。たとえば、ラグを減らすように Kafka ブローカーを調整すると、ダッシュボードには コンシューマーグループ別のラグ のチャートが下降し 毎分のメッセージ消費 のチャートが上昇する状況が示されます。
通常、ラグを削減するには以下を行います。
- 新規コンシューマーを追加してコンシューマーグループをスケールアップします。
- メッセージがトピックに留まる保持時間を延長します。
- ディスク容量を追加してメッセージバッファーを増やします。
コンシューマーラグを減らす方法は、基礎となるインフラストラクチャーや、AMQ Streams によりサポートされるユースケースによって異なります。たとえば、ラグが生じているコンシューマーでは、ディスクキャッシュからフェッチリクエストに対応できるブローカーを活用できる可能性は低いでしょう。場合によっては、コンシューマーの状態が改善されるまで、自動的にメッセージをドロップすることが許容されることがあります。
8.2. Cruise Control 操作の監視 リンクのコピーリンクがクリップボードにコピーされました!
Cruise Control は、ブローカー、トピック、およびパーティションの使用状況を追跡するために Kafka ブローカーを監視します。Cruise Control は、独自のパフォーマンスを監視するためのメトリックのセットも提供します。
Cruise Control メトリックレポーターは、Kafka ブローカーから未加工のメトリックデータを収集します。データは、Cruise Control によって自動的に作成されるトピックに生成されます。メトリックは、Kafka クラスターの最適化提案の生成 に使用されます。
Cruise Control メトリックは、Cruise Control 操作のリアルタイム監視で利用できます。たとえば、Cruise Control メトリックを使用して、実行中のリバランス操作のステータスを監視したり、操作のパフォーマンスで検出された異常についてアラートを提供したりできます。
Cruise Control 設定で Prometheus JMX Exporter を有効にして Cruise Control メトリクスを公開します。
センサー として知られる利用可能な Cruise Control メトリクスの完全なリストは、Cruise Control のドキュメントを 参照してください。
8.2.1. Cruise Control メトリックの公開 リンクのコピーリンクがクリップボードにコピーされました!
Cruise Control 操作でメトリクスを公開する場合は、Kafka リソースを設定して、Cruise Control をデプロイし、デプロイメントで Prometheus メトリクスを有効にします。独自の設定を使用するか、AMQ Streams によって提供される kafka-cruise-control-metrics.yaml ファイルのサンプルを使用できます。
設定を Kafka リソースの CruiseControl プロパティーの metricsConfig に追加します。この設定により、Prometheus JMX Exporter が有効化され、HTTP エンドポイント経由で Cruise Control メトリクスが公開されます。HTTP エンドポイントは Prometheus サーバーによってスクレープされます。
Cruise Control のメトリック設定例
8.2.2. Cruise Control メトリックの表示 リンクのコピーリンクがクリップボードにコピーされました!
Cruise Control メトリクスを公開したら、Prometheus または別の適切なモニタリングシステムを使用して、メトリクスデータの情報を表示できます。Streams for Apache Kafka は、Cruise Control メトリクスの視覚化を表示する Grafana ダッシュボードのサンプル を提供します。ダッシュボードは strimzi-cruise-control.json という名前の JSON ファイルです。公開されるメトリクスは、Grafana ダッシュボードを有効にする 際に監視データを提供します。
8.2.2.1. 分散スコアの監視 リンクのコピーリンクがクリップボードにコピーされました!
Cruise Control メトリックには、分散スコアが含まれます。分散度は、Kafka クラスター内でワークロードがどの程度均等に分散されているかを示す尺度です。
分散スコア (balancedness-score) の Cruise Control メトリクスは、KafkaRebalance リソースの分散スコアとは異なる可能性があります。Cruise Control は anomaly.detection.goals を使用して各スコアを計算します。これは、KafkaRebalance リソースで使用される default.goals と同じでない可能性があります。anomaly.detection.goals は、Kafka カスタムリソースの spec.cruiseControl.config に指定されます。
KafkaRebalance リソースを更新すると、最適化プロポーザルをフェッチします。以下の条件のいずれかが適用されると、キャッシュされた最新の最適化プロポーザルがフェッチされます。
-
KafkaRebalance
goalsは、Kafkaリソースのdefault.goalsセクションに設定されたゴールと一致する。 -
KafkaRebalance
goalsは指定されていない。
これ以外の場合は、Cruise Control は KafkaRebalance goals に基づいて、新しい最適化プロポーザルを生成します。更新ごとに新しいプロポーザルが生成されると、パフォーマンスの監視に影響を及ぼす可能性があります。
8.2.2.2. 異常検出へのアラート リンクのコピーリンクがクリップボードにコピーされました!
Cruise Control の 異常検出 は、ブローカーの障害などの最適化ゴールの生成をブロックする条件のメトリクスデータを提供します。可視性を高める場合は、異常検出器が提供するメトリックを使用して、アラートを設定し、通知を送信できます。Cruise Control の 異常通知機能 を設定して、指定された通知チャネルを介してこれらのメトリクスに基づいてアラートをルーティングできます。または、Prometheus を設定して、異常検出器によって提供されるメトリックデータをスクレープし、アラートを生成することもできます。その後、Prometheus Alertmanager は Prometheus で生成されるアラートをルーティングできます。
Cruise Control ドキュメント には、AnomalyDetector メトリクスおよび異常通知機能に関する情報が記載されています。
8.3. メトリクスファイルの例 リンクのコピーリンクがクリップボードにコピーされました!
Grafana ダッシュボードおよびその他のメトリクス設定ファイルの例は、AMQ Streams によって提供される 設定ファイルの例 を参照してください。
AMQ Streams で提供されるサンプルメトリクスファイル
- 1
- 異なる AMQ Streams コンポーネントの Grafana ダッシュボードの例。
- 2
- Grafana イメージのインストールファイル。
- 3
- CPU、メモリー、およびディスクボリュームの使用状況についてのメトリックをスクレープする追加の設定。これらのメトリックは、ノード上の OpenShift cAdvisor エージェントおよび kubelet から直接提供されます。
- 4
- Alertmanager による通知送信のためのフック定義。
- 5
- Alertmanager をデプロイおよび設定するためのリソース。
- 6
- Prometheus Alertmanager と使用するアラートルールの例 (Prometheus とデプロイ)。
- 7
- Prometheus イメージのインストールリソースファイル。
- 8
- Prometheus Operator によって Prometheus サーバーのジョブに変換される PodMonitor の定義。これにより、Pod から直接メトリックデータをスクレープできます。
- 9
- メトリックが有効になっている Kafka Bridge リソース。
- 10
- Kafka Connect に対する Prometheus JMX Exporter の再ラベル付けルールを定義するメトリック設定。
- 11
- Cruise Control に対する Prometheus JMX Exporter の再ラベル付けルールを定義するメトリック設定。
- 12
- Kafka および ZooKeeper に対する Prometheus JMX Exporter の再ラベル付けルールを定義するメトリック設定。
- 13
- Kafka Mirror Maker 2.0 に対する Prometheus JMX Exporter の再ラベル付けルールを定義するメトリクス設定。
8.3.1. Prometheus メトリクス設定の例 リンクのコピーリンクがクリップボードにコピーされました!
AMQ Streams は、Prometheus JMX Exporter を使用して、Prometheus サーバーによってスクレープできる HTTP エンドポイント経由でメトリクスを公開します。
Grafana ダッシュボードが依存する Prometheus JMX Exporter の再ラベル付けルールは、カスタムリソース設定として AMQ Streams コンポーネントに対して定義されます。
ラベルは名前と値のペアです。再ラベル付けは、ラベルを動的に書き込むプロセスです。たとえば、ラベルの値は Kafka サーバーおよびクライアント ID の名前から派生されます。
AMQ Streams では、再ラベル付けルールが含まれるカスタムリソース設定用の YAML ファイルのサンプルが提供されます。Prometheus メトリック設定をデプロイする場合、カスタムリソースのサンプルをデプロイすることや、メトリック設定を独自のカスタムリソース定義にコピーすることができます。
| コンポーネント | カスタムリソース | サンプル YAML ファイル |
|---|---|---|
| Kafka および ZooKeeper |
|
|
| Kafka Connect |
|
|
| Kafka MirrorMaker 2.0 |
|
|
| Kafka Bridge |
|
|
| Cruise Control |
|
|
8.3.2. アラート通知の Prometheus ルールの例 リンクのコピーリンクがクリップボードにコピーされました!
アラート通知の Prometheus ルールの例は、AMQ Streams によって提供される メトリクス設定ファイルの例 と共に提供されます。ルールは、Prometheus デプロイメント で使用するための prometheus-rules.yaml ファイルのサンプルに指定されています。
アラートルールによって、メトリックで監視される特定条件についての通知が提供されます。ルールは Prometheus サーバーで宣言されますが、アラート通知は Prometheus Alertmanager で対応します。
Prometheus アラートルールでは、継続的に評価される PromQL 表現を使用して条件が記述されます。
アラート表現が true になると、条件が満たされ、Prometheus サーバーからアラートデータが Alertmanager に送信されます。次に Alertmanager は、そのデプロイメントに設定された通信方法を使用して通知を送信します。
アラートルールの定義に関する一般的な留意点:
-
forプロパティーは、ルールと併用し、アラートがトリガーされるまでに、条件を維持する必要のある期間を決定します。 -
ティック (tick) は ZooKeeper の基本的な時間単位です。ミリ秒単位で測定され、
Kafka.spec.zookeeper.configのtickTimeパラメーターを使用して設定されます。たとえば、ZooKeeper でtickTime=3000の場合、3 ティック (3 x 3000) は 9000 ミリ秒と等しくなります。 -
ZookeeperRunningOutOfSpaceメトリックおよびアラートを利用できるかどうかは、使用される OpenShift 設定およびストレージ実装によります。特定のプラットフォームのストレージ実装では、メトリクスによるアラートの提供に必要な利用可能な領域について情報が提供されない場合があります。
Alertmanager は、電子メール、チャットメッセージなどの通知方法を使用するように設定できます。ルールの例に含まれるデフォルト設定は、特定のニーズに合わせて調整してください。
8.3.2.1. ルールの変更例 リンクのコピーリンクがクリップボードにコピーされました!
prometheus-rules.yaml ファイルには、以下のコンポーネントのルールの例が含まれます。
- Kafka
- ZooKeeper
- Entitiy Operator
- Kafka Connect
- Kafka Bridge
- MirrorMaker
- Kafka Exporter
各ルールの例の説明は、ファイルに記載されています。
8.3.3. Grafana ダッシュボードのサンプル リンクのコピーリンクがクリップボードにコピーされました!
Prometheus をデプロイしてメトリックを提供する場合は、AMQ Streams で提供される Grafana ダッシュボードのサンプルを使用して、AMQ Streams コンポーネントを監視できます。
ダッシュボードのサンプルは、examples/metrics/grafana-dashboards ディレクトリーに JSON ファイルで提供されます。
すべてのダッシュボードは、JVM メトリクスに加えてコンポーネントに固有のメトリクスを提供します。たとえば、AMQ Streams Operator の Grafana ダッシュボードは、調整の数または処理中のカスタムリソースに関する情報を提供します。
ダッシュボードのサンプルには、Kafka でサポートされるすべてのメトリクスは表示されません。ダッシュボードには、監視用の代表的なメトリックのセットが表示されます。
| コンポーネント | JSON ファイルの例: |
|---|---|
| AMQ Streams の Operator |
|
| Kafka |
|
| ZooKeeper |
|
| Kafka Connect |
|
| Kafka MirrorMaker 2.0 |
|
| Kafka Bridge |
|
| Cruise Control |
|
| Kafka Exporter |
|
8.4. Prometheus メトリクス設定のデプロイ リンクのコピーリンクがクリップボードにコピーされました!
Prometheus メトリック設定をデプロイし、AMQ Streams で Prometheus を使用します。metricsConfig プロパティーを使用して、Prometheus メトリクスを有効化および設定します。
独自の設定、または AMQ Streams で提供されるサンプルのカスタムリソース設定ファイル を使用できます。
-
kafka-metrics.yaml -
kafka-connect-metrics.yaml -
kafka-mirror-maker-2-metrics.yaml -
kafka-bridge-metrics.yaml -
kafka-cruise-control-metrics.yaml
設定ファイルのサンプルには、再ラベル付けルールと Prometheus メトリックの有効化に必要な設定があります。Prometheus は、ターゲット HTTP エンドポイントからメトリクスを収集します。サンプルファイルは、AMQ Streams で Prometheus を試すのに適した方法です。
再ラベル付けルールおよびメトリクス設定を適用するには、以下のいずれかを行います。
- 独自のカスタムリソースに設定例をコピーする。
- メトリクス設定でカスタムリソースをデプロイする。
Kafka Exporter メトリックを含める場合は、kafkaExporter 設定を Kafka リソースに追加します。
Kafka Exporter は、コンシューマーラグおよびコンシューマーオフセットに関連する追加のメトリックのみを提供します。通常の Kafka メトリクスでは、Kafka ブローカー で、Prometheus メトリクスを設定する必要があります。
この手順では、Kafka リソースに Prometheus メトリクス設定をデプロイする方法を説明します。このプロセスは、他のリソースのサンプルファイルを使用する場合と同じです。
手順
Prometheus 設定でカスタムリソースのサンプルをデプロイします。
たとえば、
Kafkaリソースごとにkafka-metrics.yamlファイルを適用します。サンプル設定のデプロイ
oc apply -f kafka-metrics.yaml
oc apply -f kafka-metrics.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow または、
kafka-metrics.yamlの設定例を独自のKafkaリソースにコピーすることもできます。サンプル設定のコピー
oc edit kafka <kafka-configuration-file>
oc edit kafka <kafka-configuration-file>Copy to Clipboard Copied! Toggle word wrap Toggle overflow metricsConfigプロパティーと、Kafkaリソースを参照するConfigMapをコピーします。Kafka のメトリクス設定例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 注記Kafka Bridge の場合、
enableMetricsプロパティーを指定し、これをtrueに設定します。Copy to Clipboard Copied! Toggle word wrap Toggle overflow Kafka Exporter をデプロイするには、
kafkaExporter設定を追加します。KafkaExporter設定は、Kafkaリソースでのみ指定されます。Kafka Exporter のデプロイの設定例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
8.5. OpenShift での Kafka メトリックおよびダッシュボードの表示 リンクのコピーリンクがクリップボードにコピーされました!
AMQ Streams が OpenShift Container Platform にデプロイされると、ユーザー定義プロジェクトのモニタリング によりメトリクスが提供されます。この OpenShift 機能により、開発者は独自のプロジェクト (例: Kafka プロジェクト) を監視するために別の Prometheus インスタンスにアクセスできます。
ユーザー定義プロジェクトのモニタリングが有効な場合は、openshift-user-workload-monitoring プロジェクトには以下のコンポーネントが含まれます。
- Prometheus Operator
- Prometheus インスタンス (Prometheus Operator によって自動的にデプロイされます)
- Thanos Ruler インスタンス
AMQ Streams は、これらのコンポーネントを使用してメトリックを消費します。
クラスター管理者は、ユーザー定義プロジェクトのモニタリングを有効にし、開発者およびその他のユーザーに自分のプロジェクトに含まれるアプリケーションを監視するパーミッションを付与する必要があります。
Grafana のデプロイメント
Grafana インスタンスを、Kafka クラスターが含まれるプロジェクトにデプロイできます。その後、Grafana ダッシュボードのサンプルを使用して、AMQ Streams の Prometheus メトリックを Grafana ユーザーインターフェイスで可視化できます。
openshift-monitoring プロジェクトはコアプラットフォームコンポーネントをモニタリングできます。このプロジェクトの Prometheus および Grafana コンポーネントを使用して、OpenShift Container Platform 4.x 上の AMQ Streams の監視を設定しないでください。
手順の概要
OpenShift Container Platform で AMQ Streams のモニターングを設定するには、以下の手順を順番に行います。
8.5.1. 前提条件 リンクのコピーリンクがクリップボードにコピーされました!
- YAML ファイルのサンプルを使用して、Prometheus メトリクス設定がデプロイされている。
-
ユーザー定義プロジェクトの監視が有効になっている。クラスター管理者が OpenShift クラスターに
cluster-monitoring-configの Config Map を作成している。 -
クラスター管理者は、
monitoring-rules-editまたはmonitoring-editロールを割り当てている。
cluster-monitoring-config の Config Map の作成およびユーザー定義プロジェクトの監視用のパーミッションをユーザーに付与する方法の詳細は、OpenShift Container Platform の モニタリング を参照してください。
8.5.3. Prometheus リソースのデプロイ リンクのコピーリンクがクリップボードにコピーされました!
Prometheus を使用して、Kafka クラスターのモニタリングデータを取得します。
独自の Prometheus デプロイメントを使用するか、AMQ Streams によって提供される メトリクス設定ファイルのサンプル を使用して Prometheus をデプロイできます。サンプルファイルを使用するには、PodMonitor リソースを設定し、デプロイします。PodMonitors は、Apache Kafka、ZooKeeper、Operator、Kafka Bridge、および Cruise Control から直接データをスクレープします。
次に、Alertmanager のアラートルールのサンプルをデプロイします。
前提条件
- Kafka クラスターが稼働している。
- AMQ Streams で 提供されるアラートルールのサンプル を確認している。
手順
ユーザー定義プロジェクトのモニタリングが有効であることを確認します。
oc get pods -n openshift-user-workload-monitoring
oc get pods -n openshift-user-workload-monitoringCopy to Clipboard Copied! Toggle word wrap Toggle overflow 有効であると、モニタリングコンポーネントの Pod が返されます。以下に例を示します。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Pod が返されなければ、ユーザー定義プロジェクトのモニタリングは無効になっています。「OpenShift での Kafka メトリックおよびダッシュボードの表示」 の前提条件を参照してください。
複数の
PodMonitorリソースは、examples/metrics/prometheus-install/strimzi-pod-monitor.yamlで定義されます。PodMonitorリソースごとにspec.namespaceSelector.matchNamesプロパティーを編集します。Copy to Clipboard Copied! Toggle word wrap Toggle overflow - 1
- メトリックをスクレープする Pod が実行されているプロジェクト (例:
Kafka)。
strimzi-pod-monitor.yamlファイルを、Kafka クラスターが稼働しているプロジェクトにデプロイします。oc apply -f strimzi-pod-monitor.yaml -n MY-PROJECT
oc apply -f strimzi-pod-monitor.yaml -n MY-PROJECTCopy to Clipboard Copied! Toggle word wrap Toggle overflow Prometheus ルールのサンプルを同じプロジェクトにデプロイします。
oc apply -f prometheus-rules.yaml -n MY-PROJECT
oc apply -f prometheus-rules.yaml -n MY-PROJECTCopy to Clipboard Copied! Toggle word wrap Toggle overflow
8.5.4. Grafana のサービスアカウントの作成 リンクのコピーリンクがクリップボードにコピーされました!
AMQ Streams の Grafana インスタンスは、cluster-monitoring-view ロールが割り当てられたサービスアカウントで実行する必要があります。
Grafana を使用してモニタリングのメトリクスを表示する場合は、サービスアカウントを作成します。
前提条件
手順
Grafana の
ServiceAccountを作成します。ここでは、リソースの名前はgrafana-serviceaccountです。Copy to Clipboard Copied! Toggle word wrap Toggle overflow ServiceAccountを、Kafka クラスターが含まれるプロジェクトにデプロイします。oc apply -f GRAFANA-SERVICEACCOUNT -n MY-PROJECT
oc apply -f GRAFANA-SERVICEACCOUNT -n MY-PROJECTCopy to Clipboard Copied! Toggle word wrap Toggle overflow cluster-monitoring-viewロールを GrafanaServiceAccountに割り当てるClusterRoleBindingリソースを作成します。ここでは、リソースの名前はgrafana-cluster-monitoring-bindingです。Copy to Clipboard Copied! Toggle word wrap Toggle overflow - 1
- プロジェクトの名前。
ClusterRoleBindingを、Kafka クラスターが含まれるプロジェクトにデプロイします。oc apply -f <grafana-cluster-monitoring-binding> -n <my-project>
oc apply -f <grafana-cluster-monitoring-binding> -n <my-project>Copy to Clipboard Copied! Toggle word wrap Toggle overflow
8.5.5. Prometheus データソースを使用した Grafana のデプロイ リンクのコピーリンクがクリップボードにコピーされました!
Grafana をデプロイし、Prometheus メトリックを表示します。Grafana アプリケーションには、OpenShift Container Platform モニタリングスタックの設定が必要です。
OpenShift Container Platform では、openshift-monitoring プロジェクトに Thanos Querier インスタンスが含まれています。Thanos Querier は、プラットフォームメトリクスを集約するために使用されます。
必要なプラットフォームメトリクスを使用するには、Grafana インスタンスには Thanos Querier に接続できる Prometheus データソースが必要です。この接続を設定するには、トークンを使用し、Thanos Querier と並行して実行される oauth-proxy サイドカーに対して認証を行う config map を作成します。datasource.yaml ファイルは config map のソースとして使用されます。
最後に、Kafka クラスターが含まれるプロジェクトにボリュームとしてマウントされた config map で Grafana アプリケーションをデプロイします。
手順
Grafana
ServiceAccountのアクセストークンを取得します。oc serviceaccounts get-token grafana-serviceaccount -n MY-PROJECT
oc serviceaccounts get-token grafana-serviceaccount -n MY-PROJECTCopy to Clipboard Copied! Toggle word wrap Toggle overflow 次のステップで使用するアクセストークンをコピーします。
Grafana の Thanos Querier 設定が含まれる
datasource.yamlファイルを作成します。以下に示すように、アクセストークンを
httpHeaderValue1プロパティーに貼り付けます。Copy to Clipboard Copied! Toggle word wrap Toggle overflow - 1
GRAFANA-ACCESS-TOKEN: GrafanaServiceAccountのアクセストークンの値。
datasource.yamlファイルからgrafana-configという名前の config map を作成します。oc create configmap grafana-config --from-file=datasource.yaml -n MY-PROJECT
oc create configmap grafana-config --from-file=datasource.yaml -n MY-PROJECTCopy to Clipboard Copied! Toggle word wrap Toggle overflow DeploymentおよびServiceで設定される Grafana アプリケーションを作成します。grafana-configconfig map はデータソース設定のボリュームとしてマウントされます。Copy to Clipboard Copied! Toggle word wrap Toggle overflow Grafana アプリケーションを、Kafka クラスターが含まれるプロジェクトにデプロイします。
oc apply -f <grafana-application> -n <my-project>
oc apply -f <grafana-application> -n <my-project>Copy to Clipboard Copied! Toggle word wrap Toggle overflow
8.5.6. Grafana サービスへのルートの作成 リンクのコピーリンクがクリップボードにコピーされました!
Grafana サービスを公開するルートを介して、Grafana ユーザーインターフェイスにアクセスできます。
手順
grafanaサービスへのルートを作成します。oc create route edge <my-grafana-route> --service=grafana --namespace=KAFKA-NAMESPACE
oc create route edge <my-grafana-route> --service=grafana --namespace=KAFKA-NAMESPACECopy to Clipboard Copied! Toggle word wrap Toggle overflow
8.5.7. Grafana ダッシュボードサンプルのインポート リンクのコピーリンクがクリップボードにコピーされました!
Grafana を使用して、カスタマイズ可能なダッシュボードで Prometheus メトリックを視覚化します。
AMQ Streams は、Grafana のダッシュボード設定ファイルのサンプル を JSON 形式で提供します。
-
examples/metrics/grafana-dashboards
この手順では、Grafana ダッシュボードのサンプルを使用します。
ダッシュボードのサンプルは、キーメトリックを監視するを開始点として適していますが、Kafka でサポートされるすべてのメトリックは表示されません。使用するインフラストラクチャーに応じて、ダッシュボードのサンプルの編集や、他のメトリクスの追加を行うことができます。
前提条件
手順
Grafana サービスへのルートの詳細を取得します。以下に例を示します。
oc get routes NAME HOST/PORT PATH SERVICES MY-GRAFANA-ROUTE MY-GRAFANA-ROUTE-amq-streams.net grafana
oc get routes NAME HOST/PORT PATH SERVICES MY-GRAFANA-ROUTE MY-GRAFANA-ROUTE-amq-streams.net grafanaCopy to Clipboard Copied! Toggle word wrap Toggle overflow - Web ブラウザーで、Route ホストおよびポートの URL を使用して Grafana ログイン画面にアクセスします。
ユーザー名とパスワードを入力し、続いて Log In をクリックします。
デフォルトの Grafana ユーザー名およびパスワードは、どちらも
adminです。初回ログイン後に、パスワードを変更できます。- Configuration > Data Sources で、Prometheus データソースが作成済みであることを確認します。データソースは 「Prometheus データソースを使用した Grafana のデプロイ」 に作成されています。
- + アイコンをクリックしてから、Import をクリックします。
-
examples/metrics/grafana-dashboardsで、インポートするダッシュボードの JSON をコピーします。 - JSON をテキストボックスに貼り付け、Load をクリックします。
- 他の Grafana ダッシュボードのサンプル用に、ステップ 5-7 を繰り返します。
インポートされた Grafana ダッシュボードは、Dashboards ホームページから表示できます。
第9章 AMQ Streams のアップグレード リンクのコピーリンクがクリップボードにコピーされました!
AMQ Streams をバージョン 2.2 にアップグレードすると、新機能および改良された機能、パフォーマンスの向上、およびセキュリティーオプションを利用できます。
このアップグレード中に、Kafka をサポートされる最新バージョンにアップグレードします。各 Kafka リリースによって、AMQ Streams デプロイメントに新機能、改善点、およびバグ修正が導入されます。
新しいバージョンで問題が発生した場合は、AMQ Streams を以前のバージョンに ダウングレード できます。
リリースされた AMQ Streams バージョンは、AMQ Streams ソフトウェアダウンロードページ から入手できます。
ダウンタイムと可用性
高可用性に対してトピックが設定されている場合、AMQ Streams をアップグレードしても、これらのトピックからデータをパブリッシュおよび読み取るコンシューマーとプロデューサーのダウンタイムは発生しません。高可用性トピックのレプリケーション係数は 3 以上であり、パーティションはブローカー間で均等に分散されます。
AMQ Streams をアップグレードするとローリング更新がトリガーされ、プロセスのさまざまな段階ですべてのブローカーが順に再起動されます。ローリング更新中は、すべてのブローカーがオンラインであるとは限らないため、クラスター全体の可用性 が一時的に低下します。クラスターの可用性が低下すると、ブローカーで障害が発生した場合にメッセージが失われる可能性が高くなります。
9.1. AMQ Streams のアップグレードパス リンクのコピーリンクがクリップボードにコピーされました!
利用できるアップグレードパスには、2 種類あります。
- 増分アップグレード
- AMQ Streams を以前のマイナーバージョンからバージョン 2.2 にアップグレードします。
- マルチバージョンのアップグレード
AMQ Streams を 1 回で古いバージョンからバージョン 2.2 にアップグレードします (1 つ以上の中間バージョンを飛ばします)。
たとえば、AMQ Streams 1.8 から直接 AMQ Streams 2.2 にアップグレードします。
9.1.1. サポート対象の Kafka バージョン リンクのコピーリンクがクリップボードにコピーされました!
AMQ Streams のアップグレードプロセスを開始する前に、アップグレードする Kafka バージョンを決定します。サポートされている Kafka のバージョンは Red Hat AMQ Streams Supported Configurations で確認できます。
- Kafka 3.2.3 は実稼働環境での使用でサポートされます。
- Kafka 3.1.0 は、AMQ Streams 2.2 にアップグレードする目的でのみサポートされます。
ご使用中の AMQ Streams バージョンでサポートされている Kafka バージョンのみを使用できます。AMQ Streams のバージョンでサポートされている限り、Kafka の上位バージョンにアップグレードできます。場合によっては、サポートされている以前の Kafka バージョンにダウングレードすることもできます。
9.1.2. 1.7 より前の AMQ Streams バージョンからのアップグレード リンクのコピーリンクがクリップボードにコピーされました!
AMQ Streams を 1.7 以前から 2.2 にアップグレードする場合、カスタムリソースが API バージョン v1beta2 を使用していることを確認する必要があります。AMQ Streams 1.8 以降にアップグレードする前に、カスタムリソース定義およびカスタムリソースをアップグレードする必要があります。アップグレードを実行するには、AMQ Streams 1.7 で提供される API 変換ツール を使用できます。詳細は、AMQ Streams 1.7 アップグレードドキュメント を参照してください。
すべてのカスタムリソースの v1beta2 API バージョンが AMQ Streams 1.7 で導入されました。AMQ Streams 1.8 以降では、KafkaTopic および KafkaUser を除くすべての AMQ Streams カスタムリソースから v1alpha1 および v1beta1 API バージョンが削除されました。
バージョン 1.7 より前の AMQ Streams バージョンからアップグレードする場合は、以下を行います。
- AMQ Streams を 1.7 にアップグレードする
-
カスタムリソースの
v1beta2への変換 - AMQ Streams を 1.8 以降にアップグレードする
別の方法として、バージョン 1.7 からカスタムリソースをインストールし、リソースを変換してから 1.8 以降にアップグレードすることもできます。
9.2. 必要なアップグレードシーケンス リンクのコピーリンクがクリップボードにコピーされました!
ダウンタイムなしでブローカーとクライアントをアップグレードするには、以下の順序でアップグレード手順を 必ず 完了してください。
OpenShift クラスターのバージョンがサポートされていることを確認してください。
AMQ Streams 2.2 は、OpenShift 4.8 から 4.11 でサポートされます。
-
AMQ Streams を 1.7 以前からアップグレードする場合は、既存のカスタムリソースを更新して、
v1beta2API バージョンをサポートします。 - 新しい AMQ Streams バージョンに Cluster Operator を更新 します。
- サポートされる最新の Kafka バージョンに、すべての Kafka ブローカーとクライアントアプリケーションをアップグレードします。
- オプション: パーティションの再分散に Incremental Cooperative Rebalance プロトコルを使用するために、コンシューマーと Kafka Streams アプリケーションをアップグレードします。
9.3. 最小限のダウンタイムでの OpenShift のアップグレード リンクのコピーリンクがクリップボードにコピーされました!
OpenShift をアップグレードする場合は、OpenShift アップグレードのドキュメントを参照して、アップグレードパスとノードを正しくアップグレードする手順を確認してください。OpenShift をアップグレードする前に、お使いの AMQ Streams バージョンでサポートされるバージョン を確認してください。
アップグレードを実行する際に、Kafka クラスターを利用できるようにしておくことを推奨します。
以下のストラテジーのいずれかを使用できます。
- Pod の Disruption Budget を設定します。
以下の方法の 1 つで Pod をローリングします。
- AMQ Streams Drain Cleaner の使用
- Pod へのアノテーションの手動適用
Pod のローリング手法のいずれかを使用する前に、Pod の Disruption Budget を設定する必要があります。
Kafka を稼働し続けるには、高可用性のためにトピックも複製する必要があります。これには、少なくとも 3 つのレプリケーション係数と、レプリケーション係数よりも 1 つ少ない In-Sync レプリカの最小数を指定するトピック設定が必要です。
高可用性のためにレプリケートされた Kafka トピック
高可用性環境では、Cluster Operator はアップグレードプロセス時にトピックの In-Sync レプリカの最小数を維持し、ダウンタイムが発生しないようにします。
9.3.1. AMQ Streams Drain Cleaner を使用した Pod のローリング リンクのコピーリンクがクリップボードにコピーされました!
AMQ Streams Drain Cleaner ツールを使用して、アップグレード時にノードをエビクトできます。AMQ Streams Drain Cleaner は、Pod のローリング更新アノテーションを Pod に付けます。これにより、Cluster Operator に、エビクトされた Pod のローリング更新を実行するように指示します。
Pod の Disruption Budget を使用すると、特定の時点で、指定された数の Pod だけが、利用できなくなります。Kafka ブローカー Pod の計画メンテナンス時に、Pod の Disruption Budget を使用して、Kafka が高可用性環境で引き続き実行されるようにします。
Kafka コンポーネントの template のカスタマイズを使用して、Pod の Disruption Budget を指定します。デフォルトでは、Pod の Disruption Budget は、単一の Pod のみを指定時に利用できないようにします。
これを実行するには、maxUnavailable を 0 (ゼロ) に設定します。Pod の Disruption Budget の最大値をゼロに減らすと、自発的に中断されないため、Pod を手動でエビクトする必要があります。
Pod の Disruption Budget の指定
9.3.2. トピックを利用可能な状態に維持しながらの手動での Pod のローリング リンクのコピーリンクがクリップボードにコピーされました!
アップグレード時に、Cluster Operator 経由で Pod の手動ローリング更新をトリガーできます。Pod リソースを使用して、ローリング更新は新規 Pod でリソースの Pod を再起動します。AMQ Streams Drain Cleaner を使用する場合と同様に、Pod の Disruption Budget の maxUnavailable の値をゼロに設定する必要があります。
ドレイン (解放) する必要のある Pod を監視する必要があります。次に Pod アノテーションを追加して更新を行います。
ここで、アノテーションは Kafka ブローカーを更新します。
Kafka ブローカー Pod での手動ローリング更新の実行
oc annotate pod <cluster_name>-kafka-<index> strimzi.io/manual-rolling-update=true
oc annotate pod <cluster_name>-kafka-<index> strimzi.io/manual-rolling-update=true
<cluster_name> は、クラスターの名前に置き換えます。Kafka ブローカー Pod の名前は <cluster-name>-kafka-<index> です。ここで、<index> はゼロで始まり、レプリカの合計数から 1 を引いた数で終了します。例: my-cluster-kafka-0
9.4. Cluster Operator のアップグレード リンクのコピーリンクがクリップボードにコピーされました!
デプロイの最初の方法と同じ方法を使用して、Cluster Operator をアップグレードします。
- インストールファイルの使用
- インストール用の YAML ファイルを使用して Cluster Operator をデプロイした場合は、インストールファイルを使用した Cluster Operator のアップグレード の説明に従って、Operator のインストールファイルを変更してアップグレードを実行します。
- OperatorHub の使用
OperatorHub から AMQ Streams をデプロイした場合は、Operator Lifecycle Manager (OLM) を使用して AMQ Streams Operator の更新チャネルを新しい AMQ Streams バージョンに変更します。
チャネルを更新すると、選択したアップグレード戦略に応じて、次のタイプのアップグレードのいずれかが開始されます。
- 自動アップグレード
- インストール開始前に承認が必要な手動アップグレード
注記安定した チャンネルに登録すると、チャンネルを変更せずに自動更新を取得できます。ただし、インストール前のアップグレード手順が失われる可能性があるため、自動更新を有効にすることは推奨しません。バージョン固有のチャネルでのみ自動アップグレードを使用します。
OperatorHub を使用した Operator のアップグレードについての詳細は、Upgrading installed Operators (OpenShift documentation) を参照してください。
9.4.1. Cluster Operator をアップグレードすると Kafka バージョンエラーが返される リンクのコピーリンクがクリップボードにコピーされました!
Cluster Operator をアップグレードし、サポートされていない Kafka バージョン エラーが発生した場合、Kafka クラスターのデプロイには、新しい Operator バージョンではサポートされていない古い Kafka バージョンがあります。このエラーは、すべてのインストール方法に適用されます。
このエラーが発生した場合は、Kafka をサポートされている Kafka バージョンにアップグレードしてください。Kafka リソースの spec.kafka.version をサポートされているバージョンに変更します。
oc を使用して、Kafka リソースの ステータス でこのようなエラーメッセージを確認できます。
エラーの Kafka ステータスの確認
oc get kafka <kafka_cluster_name> -n <namespace> -o jsonpath='{.status.conditions}'
oc get kafka <kafka_cluster_name> -n <namespace> -o jsonpath='{.status.conditions}'
<kafka_cluster_name> は、Kafka クラスターの名前に、<namespace> は、Pod が実行されている OpenShift namespace に置き換えます。
9.4.2. OperatorHub を使用した AMQ Streams 1.7 以前からのアップグレード リンクのコピーリンクがクリップボードにコピーされました!
OperatorHub を使用して AMQ Streams 1.7 以前からアップグレードする場合に必要となるアクション
Red Hat Integration - AMQ Streams Operator は、v1beta2 カスタムリソースのみをサポートします。OperatorHub で AMQ Streams Operator をバージョン 2.2 にアップグレードする 前 に、カスタムリソースを v1beta2 にアップグレードする必要があります。
バージョン 1.7 より前の AMQ Streams バージョンからアップグレードする場合は、以下を行います。
- AMQ Streams 1.7 にアップグレードします。
- AMQ Streams ソフトウェアのダウンロードページ から、AMQ Streams 1.8 で提供される Red Hat AMQ Streams API Conversion Tool をダウンロードします。
カスタムリソースおよび CRD を
v1beta2に変換します。詳細は、AMQ Streams 1.7 アップグレードドキュメント を参照してください。
- OperatorHub で、Red Hat Integration - AMQ Streams Operator のバージョン 1.7.0 を削除します。
存在する場合は、Red Hat Integration - AMQ Streams Operator のバージョン 2.1.0 を削除します。
存在しない場合は、次のステップに進みます。
AMQ Streams Operator の Approval Strategy が Automatic に設定されている場合、Operator のバージョン 2.1.0 がすでにクラスターに存在する可能性があります。リリース前にカスタムリソースおよび CRD を
v1beta2API バージョンに 変換しなかった 場合、Operator が管理するカスタムリソースおよび CRD は古い API バージョンを使用します。その結果、2.1.0 Operator は Pending ステータスで停止します。このような場合、Red Hat Integration - AMQ Streams Operator のバージョン 2.1.0 およびバージョン 1.7.0 を削除する必要があります。両方の Operator を削除すると、新しい Operator バージョンがインストールされるまで、調整は一時停止されます。カスタムリソースへの変更が遅延しないように、次の手順を直ちに実行します。
OperatorHub で、Red Hat Integration - AMQ Streams Operator のバージョン 2.1.0 を即時にインストールします。
インストールされた 2.1.0 Operator はクラスターの監視を開始し、ローリング更新を実行します。このプロセス中に、クラスターのパフォーマンスが一時的に低下する場合があります。
9.4.3. インストールファイルを使用した Cluster Operator のアップグレード リンクのコピーリンクがクリップボードにコピーされました!
この手順では、AMQ Streams 2.2 を使用するように Cluster Operator デプロイメントをアップグレードする方法を説明します。
インストール YAML ファイルを使用して Cluster Operator をデプロイした場合は、以下の手順に従います。
Cluster Operator によって管理される Kafka クラスターの可用性は、アップグレード操作による影響を受けません。
特定バージョンの AMQ Streams へのアップグレード方法は、そのバージョンをサポートするドキュメントを参照してください。
前提条件
- 既存の Cluster Operator デプロイメントを利用できる。
- AMQ Streams 2.2 のリリースアーティファクトがダウンロード済みである。
手順
-
既存の Cluster Operator リソース (
/install/cluster-operatorディレクトリー内) に追加した設定変更を書留めておきます。すべての変更は、新しいバージョンの Cluster Operator によって上書きされます。 - カスタムリソースを更新して、AMQ Streams バージョン 2.2 で使用できるサポート対象の設定オプションを反映します。
Cluster Operator を更新します。
Cluster Operator を実行している namespace に従い、新しい Cluster Operator バージョンのインストールファイルを編集します。
Linux の場合は、以下を使用します。
sed -i 's/namespace: .*/namespace: <my_cluster_operator_namespace>/' install/cluster-operator/*RoleBinding*.yaml
sed -i 's/namespace: .*/namespace: <my_cluster_operator_namespace>/' install/cluster-operator/*RoleBinding*.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow MacOS の場合は、以下を使用します。
sed -i '' 's/namespace: .*/namespace: <my_cluster_operator_namespace>/' install/cluster-operator/*RoleBinding*.yaml
sed -i '' 's/namespace: .*/namespace: <my_cluster_operator_namespace>/' install/cluster-operator/*RoleBinding*.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow -
既存の Cluster Operator
Deploymentで 1 つ以上の環境変数を編集した場合、install/cluster-operator/060-Deployment-strimzi-cluster-operator.yamlファイルを編集し、これらの環境変数を使用します。
設定を更新したら、残りのインストールリソースとともにデプロイします。
oc replace -f install/cluster-operator
oc replace -f install/cluster-operatorCopy to Clipboard Copied! Toggle word wrap Toggle overflow ローリング更新が完了するのを待ちます。
新しい Operator バージョンがアップグレード元の Kafka バージョンをサポートしなくなった場合、Cluster Operator はバージョンがサポートされていないことを示すエラーメッセージを返します。そうでない場合は、エラーメッセージは返されません。
エラーメッセージが返される場合は、新しい Cluster Operator バージョンでサポートされる Kafka バージョンにアップグレードします。
-
Kafkaカスタムリソースを編集します。 -
spec kafka.versionプロパティーをサポートされる Kafka バージョンに変更します。
-
- エラーメッセージが返されない場合は、次のステップに進みます。Kafka のバージョンを後でアップグレードします。
Kafka Pod のイメージを取得して、アップグレードが正常に完了したことを確認します。
oc get pods my-cluster-kafka-0 -o jsonpath='{.spec.containers[0].image}'oc get pods my-cluster-kafka-0 -o jsonpath='{.spec.containers[0].image}'Copy to Clipboard Copied! Toggle word wrap Toggle overflow イメージタグには、新しい Operator のバージョンが表示されます。以下に例を示します。
registry.redhat.io/amq7/amq-streams-kafka-32-rhel8:2.2.2
registry.redhat.io/amq7/amq-streams-kafka-32-rhel8:2.2.2Copy to Clipboard Copied! Toggle word wrap Toggle overflow
Cluster Operator はバージョン 2.2 にアップグレードされましたが、管理するクラスターで稼働している Kafka のバージョンは変更されていません。
Cluster Operator のアップグレードの次に、Kafka のアップグレード を実行する必要があります。
9.5. Kafka のアップグレード リンクのコピーリンクがクリップボードにコピーされました!
Cluster Operator を 2.2 にアップグレードした後、次にすべての Kafka ブローカーをサポートされる最新バージョンの Kafka にアップグレードします。
Kafka のアップグレードは、Kafka ブローカーのローリング更新によって Cluster Operator によって実行されます。
Cluster Operator は、Kafka クラスターの設定に基づいてローリング更新を開始します。
Kafka.spec.kafka.config に以下が含まれている場合 | Cluster Operator によって開始されるもの |
|---|---|
|
|
単一のローリング更新。更新後、 |
|
| 2 つのローリング更新 |
|
| 2 つのローリング更新 |
Kafka 3.0.0 以降、inter.broker.protocol.version が 3.0 以上に設定されていると、log.message.format.version オプションは無視されるため、設定する必要はありません。ブローカーの log.message.format.version プロパティーおよびトピックの message.format.version プロパティーは、非推奨となり、Kafka の今後のリリースで削除されます。
Cluster Operator は、Kafka のアップグレードの一環として、ZooKeeper のローリング更新を開始します。
- ZooKeeper バージョンが変更されなくても、単一のローリング更新が発生します。
- 新しいバージョンの Kafka に新しいバージョンの ZooKeeper が必要な場合、追加のローリング更新が発生します。
9.5.1. Kafka バージョン リンクのコピーリンクがクリップボードにコピーされました!
Kafka のログメッセージ形式バージョンと inter-broker プロトコルバージョンは、それぞれメッセージに追加されるログ形式バージョンとクラスターで使用される Kafka プロトコルのバージョンを指定します。正しいバージョンが使用されるようにするため、アップグレードプロセスでは、既存の Kafka ブローカーの設定変更と、クライアントアプリケーション (コンシューマーおよびプロデューサー) のコード変更が行われます。
以下の表は、Kafka バージョンの違いを示しています。
| Kafka バージョン | Inter-broker プロトコルバージョン | ログメッセージ形式バージョン | ZooKeeper バージョン |
|---|---|---|---|
| 3.2.3 | 3.2 | 3.2 | 3.6.3 |
| 3.1.0 | 3.1 | 3.1 | 3.6.3 |
Inter-broker プロトコルバージョン
Kafka では、Inter-broker の通信に使用されるネットワークプロトコルは Inter-broker プロトコル と呼ばれます。Kafka の各バージョンには、互換性のあるバージョンの Inter-broker プロトコルがあります。上記の表が示すように、プロトコルのマイナーバージョンは、通常 Kafka のマイナーバージョンと一致するように番号が増加されます。
Inter-broker プロトコルのバージョンは、Kafka リソースでクラスター全体に設定されます。これを変更するには、Kafka.spec.kafka.config の inter.broker.protocol.version プロパティーを編集します。
ログメッセージ形式バージョン
プロデューサーが Kafka ブローカーにメッセージを送信すると、特定の形式を使用してメッセージがエンコードされます。この形式は Kafka のリリース間で変更になる可能性があるため、メッセージにはエンコードに使用されたメッセージ形式のバージョンが指定されます。
特定のメッセージ形式のバージョンを設定するために使用されるプロパティーは以下のとおりです。
-
トピック用の
message.format.versionプロパティー -
Kafka ブローカーの
log.message.format.versionプロパティー
Kafka 3.0.0 以降、メッセージ形式のバージョンの値は inter.broker.protocol.version と一致すると見なされ、設定する必要はありません。値は、使用される Kafka バージョンを反映します。
Kafka 3.0.0 以降にアップグレードする場合は、inter.broker.protocol.version を更新する際にこの設定を削除できます。それ以外の場合は、アップグレード先の Kafka バージョンに基づいてメッセージ形式のバージョンを設定します。
トピックの message.format.version のデフォルト値は、Kafka ブローカーに設定される log.message.format.version によって定義されます。トピックの message.format.version は、トピック設定を編集すると手動で設定できます。
9.5.2. クライアントをアップグレードするストラテジー リンクのコピーリンクがクリップボードにコピーされました!
クライアントアプリケーション (Kafka Connect コネクターを含む) のアップグレードに適切な方法は、特定の状況によって異なります。
消費するアプリケーションは、そのアプリケーションが理解するメッセージ形式のメッセージを受信する必要があります。その状態であることを、以下のいずれかの方法で確認できます。
- プロデューサーをアップグレードする 前に、トピックのすべてのコンシューマーをアップグレードする。
- ブローカーでメッセージをダウンコンバートする。
ブローカーのダウンコンバートを使用すると、ブローカーに余分な負荷が加わるので、すべてのトピックで長期にわたりダウンコンバートに頼るのは最適な方法ではありません。ブローカーの実行を最適化するには、ブローカーがメッセージを一切ダウンコンバートしないようにしてください。
ブローカーのダウンコンバートは 2 通りの方法で設定できます。
-
トピックレベルの
message.format.version。単一のトピックが設定されます。 -
ブローカーレベルの
log.message.format.version。トップレベルのmessage.format.versionが設定されていないトピックのデフォルトです。
新バージョンの形式でトピックにパブリッシュされるメッセージは、コンシューマーによって認識されます。これは、メッセージがコンシューマーに送信されるときでなく、ブローカーがプロデューサーからメッセージを受信するときに、ブローカーがダウンコンバートを実行するからです。
クライアントのアップグレードに使用できる一般的なストラテジーを以下に示します。クライアントアプリケーションをアップグレードするストラテジーは他にもあります。
Kafka 3.0.0 以降にアップグレードすると、各ストラテジーで概説されている手順がわずかに変わります。Kafka 3.0.0 以降、メッセージ形式のバージョンの値は inter.broker.protocol.version と一致すると見なされ、設定する必要はありません。
ブローカーレベルのコンシューマーの最初のストラテジー
- コンシューマーとして機能するアプリケーションをすべてアップグレードします。
-
ブローカーレベル
log.message.format.versionを新バージョンに変更します。 - プロデューサーとして機能するアプリケーションをアップグレードします。
このストラテジーは分かりやすく、ブローカーのダウンコンバートの発生をすべて防ぎます。ただし、所属組織内のすべてのコンシューマーを整然とアップグレードできることが前提になります。また、コンシューマーとプロデューサーの両方に該当するアプリケーションには通用しません。さらにリスクとして、アップグレード済みのクライアントに問題がある場合は、新しい形式のメッセージがメッセージログに追加され、以前のコンシューマーバージョンに戻せなくなる場合があります。
トピックレベルのコンシューマーの最初のストラテジー
トピックごとに以下を実行します。
- コンシューマーとして機能するアプリケーションをすべてアップグレードします。
-
トピックレベルの
message.format.versionを新バージョンに変更します。 - プロデューサーとして機能するアプリケーションをアップグレードします。
このストラテジーではブローカーのダウンコンバートがすべて回避され、トピックごとにアップグレードできます。この方法は、同じトピックのコンシューマーとプロデューサーの両方に該当するアプリケーションには通用しません。ここでもリスクとして、アップグレード済みのクライアントに問題がある場合は、新しい形式のメッセージがメッセージログに追加される可能性があります。
ダウンコンバージョンを使用したトピックレベルのコンシューマーの最初のストラテジー
トピックごとに以下を実行します。
-
トピックレベルの
message.format.versionを、旧バージョンに変更します (または、デフォルトがブローカーレベルのlog.message.format.versionのトピックを利用します)。 - コンシューマーおよびプロデューサーとして機能するアプリケーションをすべてアップグレードします。
- アップグレードしたアプリケーションが正しく機能することを確認します。
-
トピックレベルの
message.format.versionを新バージョンに変更します。
このストラテジーにはブローカーのダウンコンバートが必要ですが、ダウンコンバートは一度に 1 つのトピック (またはトピックの小さなグループ) のみに必要になるので、ブローカーへの負荷は最小限に抑えられます。この方法は、同じトピックのコンシューマーとプロデューサーの両方に該当するアプリケーションにも通用します。この方法により、新しいメッセージ形式バージョンを使用する前に、アップグレードされたプロデューサーとコンシューマーが正しく機能することが保証されます。
この方法の主な欠点は、多くのトピックやアプリケーションが含まれるクラスターでの管理が複雑になる場合があることです。
複数のストラテジーを適用することもできます。たとえば、最初のアプリケーションおよびトピックには、"per-topic consumers first, with down conversion" ストラテジーを使用することができます。これが問題なく適用されたら、より効率的な別のストラテジーの使用を検討できます。
9.5.3. Kafka バージョンおよびイメージマッピング リンクのコピーリンクがクリップボードにコピーされました!
Kafka のアップグレード時に、STRIMZI_KAFKA_IMAGES 環境変数と Kafka.spec.kafka.version プロパティーの設定について考慮してください。
-
それぞれの
KafkaリソースはKafka.spec.kafka.versionで設定できます。 Cluster Operator の
STRIMZI_KAFKA_IMAGES環境変数により、Kafka のバージョンと、指定のKafkaリソースでそのバージョンが要求されるときに使用されるイメージをマッピングできます。-
Kafka.spec.kafka.imageを設定しないと、そのバージョンのデフォルトのイメージが使用されます。 -
Kafka.spec.kafka.imageを設定すると、デフォルトのイメージがオーバーライドされます。
-
Cluster Operator は、Kafka ブローカーの想定されるバージョンが実際にイメージに含まれているかどうかを検証できません。所定のイメージが所定の Kafka バージョンに対応することを必ず確認してください。
9.5.4. Kafka ブローカーおよびクライアントアプリケーションのアップグレード リンクのコピーリンクがクリップボードにコピーされました!
この手順では、AMQ Streams Kafka クラスターを最新のサポートされる Kafka バージョンにアップグレードする方法を説明します。
新しい Kafka バージョンを現在のバージョンと比較すると、新しいバージョンは ログメッセージ形式の上位バージョン、ブローカー間プロトコルの上位バージョン、またはその両方をサポートする可能性があります。必要に応じて、これらのバージョンをアップグレードする手順を実行します。詳細は、「Kafka バージョン」 を参照してください。
クライアントをアップグレードするストラテジー を選択する必要もあります。Kafka クライアントは、この手順の 6 でアップグレードされます。
前提条件
Kafka リソースをアップグレードするには、以下を確認します。
- 両バージョンの Kafka をサポートする Cluster Operator が稼働している。
-
Kafka.spec.kafka.configには、新しい Kafka バージョンでサポートされないオプションが含まれていない。
手順
Kafka クラスター設定を更新します。
oc edit kafka my-cluster
oc edit kafka my-clusterCopy to Clipboard Copied! Toggle word wrap Toggle overflow 設定されている場合、
Kafka.spec.kafka.configにlog.message.format.versionがあり、inter.broker.protocol.versionが現在の Kafka バージョンのデフォルトに設定されていることを確認します。たとえば、Kafka のバージョン 3.1.0 から 3.2.3 へのアップグレードは以下のようになります。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow log.message.format.versionおよびinter.broker.protocol.versionが設定されていない場合、AMQ Streams では、次のステップの Kafka バージョンの更新後、これらのバージョンを現在のデフォルトに自動的に更新します。注記log.message.format.versionおよびinter.broker.protocol.versionの値は、浮動小数点数として解釈されないように文字列である必要があります。Kafka.spec.kafka.versionを変更して、新しい Kafka バージョンを指定します。現在の Kafka バージョンのデフォルトでlog.message.format.versionおよびinter.broker.protocol.versionのままにします。注記kafka.versionを変更すると、クラスターのすべてのブローカーがアップグレードされ、新しいブローカーバイナリーの使用が開始されます。このプロセスでは、一部のブローカーは古いバイナリーを使用し、他のブローカーはすでに新しいバイナリーにアップグレードされています。Inter.broker.protocol.versionを変更しないと、ブローカーはアップグレード中も相互に通信を継続できます。たとえば、Kafka 3.1.0 から 3.2.3 へのアップグレードは以下のようになります。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 警告新しい Kafka バージョンの
inter.broker.protocol.versionが変更された場合は、Kafka をダウングレードできません。ブローカー間プロトコルのバージョンは、__consumer_offsetsに書き込まれたメッセージなど、ブローカーによって保存される永続メタデータに使用されるスキーマを判断します。ダウングレードされたクラスターはメッセージを理解しません。Kafka クラスターのイメージが
Kafka.spec.kafka.imageの Kafka カスタムリソースで定義されている場合、imageを更新して、新しい Kafka バージョンでコンテナーイメージを示すようにします。Kafka バージョンおよびイメージマッピング を参照してください。
エディターを保存して終了し、ローリング更新の完了を待ちます。
Pod の状態の遷移を監視して、ローリング更新の進捗を確認します。
oc get pods my-cluster-kafka-0 -o jsonpath='{.spec.containers[0].image}'oc get pods my-cluster-kafka-0 -o jsonpath='{.spec.containers[0].image}'Copy to Clipboard Copied! Toggle word wrap Toggle overflow ローリング更新により、各 Pod が新バージョンの Kafka のブローカーバイナリーを使用するようになります。
クライアントのアップグレードに選択したストラテジー に応じて、新バージョンのクライアントバイナリーを使用するようにすべてのクライアントアプリケーションをアップグレードします。
必要に応じて、Kafka Connect および MirrorMaker の
versionプロパティーを新バージョンの Kafka として設定します。-
Kafka Connect では、
KafkaConnect.spec.versionを更新します。 -
MirrorMaker では、
KafkaMirrorMaker.spec.versionを更新します。 -
MirrorMaker 2.0 の場合は、
KafkaMirrorMaker2.spec.versionを更新します。
-
Kafka Connect では、
設定されている場合、新しい
inter.broker.protocol.versionバージョンを使用するように Kafka リソースを更新します。それ以外の場合は、ステップ 9 に進みます。たとえば、Kafka 3.2.3 へのアップグレードは以下のようになります。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - Cluster Operator によってクラスターが更新されるまで待ちます。
設定されている場合、新しい
log.message.format.versionバージョンを使用するように Kafka リソースを更新します。それ以外の場合は、ステップ 10 に進みます。たとえば、Kafka 3.2.3 へのアップグレードは以下のようになります。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 重要Kafka 3.0.0 以降、
inter.broker.protocol.versionが3.0以上に設定されていると、log.message.format.versionオプションは無視されるため、設定する必要はありません。Cluster Operator によってクラスターが更新されるまで待ちます。
- これで、Kafka クラスターおよびクライアントが新バージョンの Kafka を使用するようになります。
- ブローカーは、ブローカー間プロトコルバージョンと新バージョンの Kafka のメッセージ形式バージョンを使用して、メッセージを送信するように設定されます。
Kafka のアップグレードに従い、必要な場合は以下を行うことができます。
9.6. コンシューマーの Cooperative Rebalancing へのアップグレード リンクのコピーリンクがクリップボードにコピーされました!
Kafka コンシューマーおよび Kafka Streams アプリケーションをアップグレードすることで、パーティションの再分散にデフォルトの Eager Rebalance プロトコルではなく Incremental Cooperative Rebalance プロトコルを使用できます。この新しいプロトコルが Kafka 2.4.0 に追加されました。
コンシューマーは、パーティションの割り当てを Cooperative Rebalance で保持し、クラスターの分散が必要な場合にプロセスの最後でのみ割り当てを取り消します。これにより、コンシューマーグループまたは Kafka Streams アプリケーションが使用不可能になる状態が削減されます。
Incremental Cooperative Rebalance プロトコルへのアップグレードは任意です。Eager Rebalance プロトコルは引き続きサポートされます。
前提条件
- Kafka 3.2.3 に Kafka ブローカーおよびクライアントアプリケーションをアップグレード済み である。
手順
Incremental Cooperative Rebalance プロトコルを使用するように Kafka コンシューマーをアップグレードする方法:
-
Kafka クライアント
.jarファイルを新バージョンに置き換えます。 -
コンシューマー設定で、
partition.assignment.strategyにcooperative-stickyを追加します。たとえば、rangeストラテジーが設定されている場合は、設定をrange, cooperative-stickyに変更します。 - グループ内の各コンシューマーを順次再起動し、再起動後に各コンシューマーがグループに再度参加するまで待ちます。
-
コンシューマー設定から前述の
partition.assignment.strategyを削除して、グループの各コンシューマーを再設定し、cooperative-stickyストラテジーのみを残します。 - グループ内の各コンシューマーを順次再起動し、再起動後に各コンシューマーがグループに再度参加するまで待ちます。
Incremental Cooperative Rebalance プロトコルを使用するように Kafka Streams アプリケーションをアップグレードする方法:
-
Kafka Streams の
.jarファイルを新バージョンに置き換えます。 -
Kafka Streams の設定で、
upgrade.from設定パラメーターをアップグレード前の Kafka バージョンに設定します (例: 2.3)。 - 各ストリームプロセッサー (ノード) を順次再起動します。
-
upgrade.from設定パラメーターを Kafka Streams 設定から削除します。 - グループ内の各コンシューマーを順次再起動します。
第10章 AMQ Streams のダウングレード リンクのコピーリンクがクリップボードにコピーされました!
アップグレードしたバージョンの AMQ Streams で問題が発生した場合は、インストールを直前のバージョンに戻すことができます。
YAML インストールファイルを使用して AMQ Streams をインストールした場合は、以前のリリースから YAML インストールファイルを使用して、以下のダウングレード手順を実行できます。
以前のバージョンの AMQ Streams では使用している Kafka バージョンがサポートされない場合、メッセージに追加されるログメッセージ形式のバージョンが一致すれば Kafka をダウングレードすることができます。
別のインストール方法を使用して AMQ Streams をデプロイした場合は、サポートされるアプローチを使用して AMQ Streams をダウングレードします。ここでは、ダウングレードの手順を使用しないでください。たとえば、Operator Lifecycle Manager (OLM) を使用して AMQ Streams をインストールした場合、デプロイチャネルを以前のバージョンの AMQ Streams に変更することでダウングレードできます。
10.1. Cluster Operator の以前のバージョンへのダウングレード リンクのコピーリンクがクリップボードにコピーされました!
AMQ Streams で問題が発生した場合は、インストールを元に戻すことができます。
この手順では、Cluster Operator デプロイメントを以前のバージョンにダウングレードする方法を説明します。
前提条件
- 既存の Cluster Operator デプロイメントを利用できる。
- 以前のバージョンのインストールファイルがダウンロード済み である必要があります。
手順
-
既存の Cluster Operator リソース (
/install/cluster-operatorディレクトリー内) に追加した設定変更を書留めておきます。すべての変更は、以前のバージョンの Cluster Operator によって上書きされます。 - カスタムリソースを元に戻して、ダウングレードする AMQ Streams バージョンで利用可能なサポート対象の設定オプションを反映します。
Cluster Operator を更新します。
Cluster Operator を実行している namespace に従い、以前のバージョンのインストールファイルを編集します。
Linux の場合は、以下を使用します。
sed -i 's/namespace: .*/namespace: <my_cluster_operator_namespace>/' install/cluster-operator/*RoleBinding*.yaml
sed -i 's/namespace: .*/namespace: <my_cluster_operator_namespace>/' install/cluster-operator/*RoleBinding*.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow MacOS の場合は、以下を使用します。
sed -i '' 's/namespace: .*/namespace: <my_cluster_operator_namespace>/' install/cluster-operator/*RoleBinding*.yaml
sed -i '' 's/namespace: .*/namespace: <my_cluster_operator_namespace>/' install/cluster-operator/*RoleBinding*.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow -
既存の Cluster Operator
Deploymentで 1 つ以上の環境変数を編集した場合、install/cluster-operator/060-Deployment-strimzi-cluster-operator.yamlファイルを編集し、これらの環境変数を使用します。
設定を更新したら、残りのインストールリソースとともにデプロイします。
oc replace -f install/cluster-operator
oc replace -f install/cluster-operatorCopy to Clipboard Copied! Toggle word wrap Toggle overflow ローリング更新が完了するのを待ちます。
Kafka Pod のイメージを取得して、ダウングレードが正常に完了したことを確認します。
oc get pod my-cluster-kafka-0 -o jsonpath='{.spec.containers[0].image}'oc get pod my-cluster-kafka-0 -o jsonpath='{.spec.containers[0].image}'Copy to Clipboard Copied! Toggle word wrap Toggle overflow イメージタグには、新しい AMQ Streams バージョンと Kafka バージョンが順に示されます。例:
NEW-STRIMZI-VERSION-kafka-CURRENT-KAFKA-VERSION
Cluster Operator は以前のバージョンにダウングレードされました。
10.2. Kafka のダウングレード リンクのコピーリンクがクリップボードにコピーされました!
Kafka バージョンのダウングレードは、Cluster Operator によって実行されます。
10.2.1. ダウングレードでの Kafka バージョンの互換性 リンクのコピーリンクがクリップボードにコピーされました!
Kafka のダウングレードは、互換性のある現在およびターゲットの Kafka バージョン と、メッセージがログに記録された状態に依存します。
そのバージョンが、クラスターでこれまで使用された inter.broker.protocol.version 設定をサポートしない場合、または新しい log.message.format.version を使用するメッセージログにメッセージが追加された場合は、下位バージョンの Kafka に戻すことはできません。
Inter.broker.protocol.version は、__consumer_offsets に書き込まれたメッセージのスキーマなど、ブローカーによって保存される永続メタデータに使用されるスキーマを判断します。クラスターで以前使用された inter.broker.protocol.version が認識されない Kafka バージョンにダウングレードすると、ブローカーが認識できないデータが発生します。
ダウングレードする Kafka のバージョンの関係は次のとおりです。
-
ダウングレードする Kafka バージョンの
log.message.format.versionが現行バージョンと 同じ である場合、Cluster Operator は、ブローカーのローリング再起動を 1 回実行してダウングレードを行います。 別の
log.message.format.versionの場合、ダウングレード後の Kafka バージョンが使用するバージョンに設定されたlog.message.format.versionが常に 実行中のクラスターに存在する場合に限り、ダウングレードが可能です。通常は、アップグレードの手順がlog.message.format.versionの変更前に中止された場合にのみ該当します。その場合、ダウングレードには以下が必要です。- 2 つのバージョンで Interbroker プロトコルが異なる場合、ブローカーのローリング再起動が 2 回必要です。
- 両バージョンで同じ場合は、ローリング再起動が 1 回必要です。
以前のバージョンでサポートされない log.message.format.version が新バージョンで使われていた場合 (log.message.format.version のデフォルト値が使われていた場合など)、ダウングレードは実行 できません。たとえば以下のリソースの場合、log.message.format.version が変更されていないので、Kafka バージョン 3.1.0 にダウングレードできます。
log.message.format.version が "3.2" に設定されているか、値がない (このためパラメーターに 3.2.3 ブローカーのデフォルト値 3.2 が採用される) 場合は、ダウングレードを実施できません。
Kafka 3.0.0 以降、inter.broker.protocol.version が 3.0 以上に設定されていると、log.message.format.version オプションは無視されるため、設定する必要はありません。
10.2.2. Kafka ブローカーおよびクライアントアプリケーションのダウングレード リンクのコピーリンクがクリップボードにコピーされました!
この手順では、AMQ Streams Kafka クラスターを Kafka の下位 (以前の) バージョンにダウングレードする方法 (3.2.3 から 3.1.0 へのダウングレードなど) を説明します。
前提条件
AMQ Streams Kafka クラスターをダウングレードする前に、Kafka リソースについて以下を確認してください。
- 重要: Kafka バージョンの互換性。
- 両バージョンの Kafka をサポートする Cluster Operator が稼働している。
-
Kafka.spec.kafka.configに、ダウングレードする Kafka バージョンでサポートされていないオプションが含まれていない。 Kafka.spec.kafka.configに、ダウングレード先の Kafka バージョンでサポートされるlog.message.format.versionとinter.broker.protocol.versionがある。Kafka 3.0.0 以降、
inter.broker.protocol.versionが3.0以上に設定されていると、log.message.format.versionオプションは無視されるため、設定する必要はありません。
手順
Kafka クラスター設定を更新します。
oc edit kafka KAFKA-CONFIGURATION-FILE
oc edit kafka KAFKA-CONFIGURATION-FILECopy to Clipboard Copied! Toggle word wrap Toggle overflow Kafka.spec.kafka.versionを変更して、以前のバージョンを指定します。たとえば、Kafka 3.2.3 から 3.1.0 へのダウングレードは以下のようになります。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 注記log.message.format.versionおよびinter.broker.protocol.versionの値は、浮動小数点数として解釈されないように文字列である必要があります。Kafka バージョンのイメージが Cluster Operator の
STRIMZI_KAFKA_IMAGESに定義されているイメージとは異なる場合は、Kafka.spec.kafka.imageを更新します。エディターを保存して終了し、ローリング更新の完了を待ちます。
更新をログで確認するか、Pod 状態の遷移を監視して確認します。
oc logs -f CLUSTER-OPERATOR-POD-NAME | grep -E "Kafka version downgrade from [0-9.]+ to [0-9.]+, phase ([0-9]+) of \1 completed"
oc logs -f CLUSTER-OPERATOR-POD-NAME | grep -E "Kafka version downgrade from [0-9.]+ to [0-9.]+, phase ([0-9]+) of \1 completed"Copy to Clipboard Copied! Toggle word wrap Toggle overflow oc get pod -w
oc get pod -wCopy to Clipboard Copied! Toggle word wrap Toggle overflow Cluster Operator ログで
INFOレベルのメッセージを確認します。Reconciliation #NUM(watch) Kafka(NAMESPACE/NAME): Kafka version downgrade from FROM-VERSION to TO-VERSION, phase 1 of 1 completed
Reconciliation #NUM(watch) Kafka(NAMESPACE/NAME): Kafka version downgrade from FROM-VERSION to TO-VERSION, phase 1 of 1 completedCopy to Clipboard Copied! Toggle word wrap Toggle overflow すべてのクライアントアプリケーション (コンシューマー) をダウングレードして、以前のバージョンのクライアントバイナリーを使用します。
これで、Kafka クラスターおよびクライアントは以前の Kafka バージョンを使用するようになります。
トピックメタデータの保存に ZooKeeper を使用する 1.7 よりも前のバージョンの AMQ Streams に戻す場合は、Kafka クラスターから内部トピックストアのトピックを削除します。
oc run kafka-admin -ti --image=registry.redhat.io/amq7/amq-streams-kafka-32-rhel8:2.2.2 --rm=true --restart=Never -- ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic __strimzi-topic-operator-kstreams-topic-store-changelog --delete && ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic __strimzi_store_topic --delete
oc run kafka-admin -ti --image=registry.redhat.io/amq7/amq-streams-kafka-32-rhel8:2.2.2 --rm=true --restart=Never -- ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic __strimzi-topic-operator-kstreams-topic-store-changelog --delete && ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic __strimzi_store_topic --deleteCopy to Clipboard Copied! Toggle word wrap Toggle overflow
第11章 AMQ Streams のアンインストール リンクのコピーリンクがクリップボードにコピーされました!
OpenShift Container Platform Web コンソールまたは CLI を使用して、OperatorHub から OpenShift 4.8 から 4.11 への AMQ Streams をアンインストールできます。
AMQ Streams のインストールに使用したのと同じアプローチを使用します。
AMQ Streams をアンインストールする場合は、デプロイメント専用に作成され、AMQ Streams リソースから参照されるリソースを特定する必要があります。
このようなリソースには以下があります。
- シークレット (カスタム CA および証明書、Kafka Connect Secrets、その他の Kafka シークレット)
-
ロギング
ConfigMap(externalタイプ)
Kafka、KafkaConnect、KafkaMirrorMaker、KafkaBridge のいずれかの設定で参照されるリソースです。
CustomResourceDefinitions を削除すると、対応するカスタムリソース (Kafka、KafkaConnect、KafkaMirrorMaker、または KafkaBridge)、およびそれらに依存するリソース (Deployments、StatefulSets、およびその他の依存リソース) のガベージコレクションが実行されます。
11.1. Web コンソールを使用した OperatorHub からの AMQ Streams のアンインストール リンクのコピーリンクがクリップボードにコピーされました!
この手順では、OperatorHub から AMQ Streams をアンインストールし、デプロイメントに関連するリソースを削除する方法を説明します。
コンソールから手順を実行したり、別の CLI コマンドを使用したりできます。
前提条件
-
cluster-adminまたはstrimzi-adminパーミッションを持つアカウントを使用して OpenShift Container Platform Web コンソールにアクセスできる。 削除するリソースを特定している。
AMQ Streams をアンインストールしたら、以下の
ocCLI コマンドを使用してリソースを検索して、削除されていることを確認できます。AMQ Streams デプロイメントに関連するリソースを検索するコマンド
oc get <resource_type> --all-namespaces | grep <kafka_cluster_name>
oc get <resource_type> --all-namespaces | grep <kafka_cluster_name>Copy to Clipboard Copied! Toggle word wrap Toggle overflow <resource_type> は、
secretまたはconfigmapなどのチェックするリソースのタイプに置き換えます。
手順
- OpenShift Web コンソールで、Operators > Installed Operators に移動します。
Red Hat Integration - AMQ Streams Operator がインストールされている場合には、オプションアイコン (3 つの縦の点) を選択し、Uninstall Operator をクリックします。
Operator が Installed Operators から削除されます。
- Home > Projects に移動し、AMQ Streams と Kafka コンポーネントがインストールされているプロジェクトを選択します。
Inventory のオプションをクリックして関連リソースを削除します。
リソースには以下が含まれます。
- Deployments
- StatefulSets
- Pod
- Services
- ConfigMap
- Secrets
ヒント検索を使用して、Kafka クラスターの名前で始まる関連リソースを検索します。また、Workloads でもリソースを検索できます。
代わりの CLI コマンド
CLI コマンドを使用して、OperatorHub から AMQ Streams をアンインストールできます。
AMQ Streams サブスクリプションを削除します。
oc delete subscription amq-streams -n openshift-operators
oc delete subscription amq-streams -n openshift-operatorsCopy to Clipboard Copied! Toggle word wrap Toggle overflow クラスターサービスバージョン (CSV) を削除します。
oc delete csv amqstreams.<version> -n openshift-operators
oc delete csv amqstreams.<version> -n openshift-operatorsCopy to Clipboard Copied! Toggle word wrap Toggle overflow 関連する CRD を削除します。
oc get crd -l app=strimzi -o name | xargs oc delete
oc get crd -l app=strimzi -o name | xargs oc deleteCopy to Clipboard Copied! Toggle word wrap Toggle overflow
11.2. CLI を使用した AMQ Streams のアンインストール リンクのコピーリンクがクリップボードにコピーされました!
この手順では、oc コマンドラインツールを使用して AMQ Streams をアンインストールし、デプロイメントに関連するリソースを削除する方法を説明します。
前提条件
-
cluster-adminまたはstrimzi-admin権限を持つアカウントを使用して、OpenShift クラスターにアクセスできる。 削除するリソースを特定している。
AMQ Streams をアンインストールしたら、以下の
ocCLI コマンドを使用してリソースを検索して、削除されていることを確認できます。AMQ Streams デプロイメントに関連するリソースを検索するコマンド
oc get <resource_type> --all-namespaces | grep <kafka_cluster_name>
oc get <resource_type> --all-namespaces | grep <kafka_cluster_name>Copy to Clipboard Copied! Toggle word wrap Toggle overflow <resource_type> は、
secretまたはconfigmapなどのチェックするリソースのタイプに置き換えます。
手順
Cluster Operator
Deployment、関連するCustomResourceDefinitions、およびRBACリソースを削除します。Cluster Operator のデプロイに使用するインストールファイルを指定します。
oc delete -f install/cluster-operator
oc delete -f install/cluster-operatorCopy to Clipboard Copied! Toggle word wrap Toggle overflow 前提条件で特定したリソースを削除します。
oc delete <resource_type> <resource_name> -n <namespace>
oc delete <resource_type> <resource_name> -n <namespace>Copy to Clipboard Copied! Toggle word wrap Toggle overflow <resource_type> は削除するリソースのタイプに、 <resource_name> はリソースの名前に置き換えます。
シークレットの削除例
oc delete secret my-cluster-clients-ca -n my-project
oc delete secret my-cluster-clients-ca -n my-projectCopy to Clipboard Copied! Toggle word wrap Toggle overflow
第12章 AMQ Streams でのメータリングの使用 リンクのコピーリンクがクリップボードにコピーされました!
OCP 4 で利用可能なメータリングツールを使用して、異なるデータソースからメータリングレポートを生成できます。クラスター管理者として、メータリングを使用してクラスターの内容を分析できます。独自のクエリーを作成するか、事前定義 SQL クエリーを使用して、利用可能な異なるデータソースからデータを処理する方法を定義できます。Prometheus をデフォルトのデータソースとして使用すると、Pod、namespace、およびその他ほとんどの OpenShift リソースのレポートを生成できます。
OpenShift のメータリング Operator を使用すると、インストールされた AMQ Streams コンポーネントを分析し、Red Hat サブスクリプションに準拠しているかどうかを判断できます。
AMQ Streams でメータリングを使用するには、まず OpenShift Container Platform に メータリング Operator をインストールし、設定する必要があります。
12.1. メータリングリソース リンクのコピーリンクがクリップボードにコピーされました!
メータリングには、メータリングのデプロイメントやインストール、およびメータリングが提供するレポート機能を管理するために使用できるリソースが多数含まれています。メータリングは以下の CRD を使用して管理されます。
| 名前 | 説明 |
|---|---|
|
| デプロイメントのメータリングスタックを設定します。メータリングスタック設定用の各コンポーネントを制御するカスタマイズおよび設定オプションが含まれます。 |
|
| 使用するクエリー、クエリーを実行するタイミングおよび頻度、および結果を保存する場所を制御します。 |
|
|
|
|
| ReportQuery および Report で利用可能なデータを制御します。メータリング内で使用できるように複数の異なるデータベースへのアクセスの設定を可能にします。 |
12.2. AMQ Streams のメータリングラベル リンクのコピーリンクがクリップボードにコピーされました!
以下の表では、AMQ Streams インフラストラクチャーコンポーネントおよびインテグレーションのメータリングラベルがリスト表示されています。
| ラベル | 使用できる値 |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| インフラストラクチャー
|
| アプリケーション
| |
|
|
|
例
インフラストラクチャーの例 (インフラストラクチャーコンポーネントが entity-operator の場合)
Copy to Clipboard Copied! Toggle word wrap Toggle overflow アプリケーションの例 (インテグレーションのデプロイメント名が kafka-bridge の場合)
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
付録A サブスクリプションの使用 リンクのコピーリンクがクリップボードにコピーされました!
AMQ Streams は、ソフトウェアサブスクリプションから提供されます。サブスクリプションを管理するには、Red Hat カスタマーポータルでアカウントにアクセスします。
アカウントへのアクセス
- access.redhat.com に移動します。
- アカウントがない場合は作成します。
- アカウントにログインします。
サブスクリプションのアクティベート
- access.redhat.com に移動します。
- My Subscriptions に移動します。
- Activate a subscription に移動し、16 桁のアクティベーション番号を入力します。
Zip および Tar ファイルのダウンロード
zip または tar ファイルにアクセスするには、カスタマーポータルを使用して、ダウンロードする関連ファイルを検索します。RPM パッケージを使用している場合、この手順は必要ありません。
- ブラウザーを開き、access.redhat.com/downloads で Red Hat カスタマーポータルの Product Downloads ページにログインします。
- INTEGRATION AND AUTOMATION カテゴリーで、AMQ Streams for Apache Kafka エントリーを見つけます。
- 必要な AMQ Streams 製品を選択します。Software Downloads ページが開きます。
- コンポーネントの Download リンクをクリックします。
DNF を使用したパッケージのインストール
パッケージとすべてのパッケージ依存関係をインストールするには、以下を使用します。
dnf install <package_name>
dnf install <package_name>
ローカルディレクトリーからダウンロード済みのパッケージをインストールするには、以下を使用します。
dnf install <path_to_download_package>
dnf install <path_to_download_package>
改訂日時: 2023-10-24