第5章 AMQ Streams operator の使用
AMQ Streams の operator を使用して Kafka クラスターと Kafka トピックおよびユーザーを管理します。
5.1. Cluster Operator の使用 リンクのコピーリンクがクリップボードにコピーされました!
Cluster Operator は Kafka クラスターや他の Kafka コンポーネントをデプロイするために使用されます。
Cluster Operator は YAML インストールファイルを使用してデプロイされます。
OpenShift では、Kafka Connect デプロイメントに Source2Image 機能を組み込み、追加のコネクターを加えるための便利な方法として利用できます。
5.1.1. Cluster Operator の設定 リンクのコピーリンクがクリップボードにコピーされました!
Cluster Operator は、サポートされる環境変数を使用してロギング設定から設定できます。
環境変数は、Cluster Operator イメージのデプロイメンのコンテナー設定に関連します。image 設定の詳細は、「image」 を参照してください。
STRIMZI_NAMESPACEOperator が操作する namespace のカンマ区切りのリスト。設定されていない場合や、空の文字列や
*に設定された場合は、Cluster Operator はすべての namespace で操作します。Cluster Operator デプロイメントでは OpenShift Downward API を使用して、これを Cluster Operator がデプロイされる namespace に自動設定することがあります。Cluster Operator namespace の設定例
env: - name: STRIMZI_NAMESPACE valueFrom: fieldRef: fieldPath: metadata.namespaceenv: - name: STRIMZI_NAMESPACE valueFrom: fieldRef: fieldPath: metadata.namespaceCopy to Clipboard Copied! Toggle word wrap Toggle overflow -
STRIMZI_FULL_RECONCILIATION_INTERVAL_MS - 任意設定、デフォルトは 120000 ミリ秒です。定期的な調整の間隔 (秒単位)。
STRIMZI_OPERATION_TIMEOUT_MS- 任意設定、デフォルトは 300000 ミリ秒です。内部操作のタイムアウト (ミリ秒単位)。この値は、標準の OpenShift 操作の時間が通常よりも長いクラスターで (Docker イメージのダウンロードが遅い場合など) AMQ Streams を使用する場合に増やす必要があります。
STRIMZI_OPERATOR_NAMESPACEAMQ Streams Cluster Operator が稼働している namespace の名前。この変数は手動で設定しないでください。OpenShift Downward API を使用します。
env: - name: STRIMZI_OPERATOR_NAMESPACE valueFrom: fieldRef: fieldPath: metadata.namespaceenv: - name: STRIMZI_OPERATOR_NAMESPACE valueFrom: fieldRef: fieldPath: metadata.namespaceCopy to Clipboard Copied! Toggle word wrap Toggle overflow STRIMZI_OPERATOR_NAMESPACE_LABELS任意設定。AMQ Streams Cluster Operator が稼働している namespace のラベル。namespace ラベルは、ネットワークポリシーで namespace セレクターを設定するために使用されます。これにより、AMQ Streams Cluster Operator はこれらのラベルを持つ namespace からのオペランドのみにアクセスできます。設定されていない場合、ネットワークポリシーの namespace セレクターは、OpenShift クラスターのすべての namespace から AMQ Streams Cluster Operator にアクセスできるように設定されます。
env: - name: STRIMZI_OPERATOR_NAMESPACE_LABELS value: label1=value1,label2=value2env: - name: STRIMZI_OPERATOR_NAMESPACE_LABELS value: label1=value1,label2=value2Copy to Clipboard Copied! Toggle word wrap Toggle overflow STRIMZI_CUSTOM_RESOURCE_SELECTOR任意設定。Operator によって処理されるカスタムリソースのフィルタリングに使用されるラベルセレクターを指定します。Operator は、指定されたラベルが設定されているカスタムリソースでのみ動作します。これらのラベルのないリソースは Operator によって認識されません。ラベルセレクターは、
Kafka、KafkaConnect、KafkaConnectS2I、KafkaBridge、KafkaMirrorMaker、およびKafkaMirrorMaker2リソースに適用されます。KafkaRebalanceおよびKafkaConnectorリソースは、対応する Kafka および Kafka Connect クラスターに一致するラベルがある場合にのみ操作されます。env: - name: STRIMZI_CUSTOM_RESOURCE_SELECTOR value: label1=value1,label2=value2env: - name: STRIMZI_CUSTOM_RESOURCE_SELECTOR value: label1=value1,label2=value2Copy to Clipboard Copied! Toggle word wrap Toggle overflow STRIMZI_KAFKA_IMAGES-
必須。Kafka バージョンから、そのバージョンの Kafka ブローカーが含まれる該当の Docker イメージへのマッピングが提供されます。必要な構文は、空白またはカンマ区切りの
<version>=<image>ペアです。例:2.6.0=registry.redhat.io/amq7/amq-streams-kafka-26-rhel7:1.7.0, 2.7.0=registry.redhat.io/amq7/amq-streams-kafka-27-rhel7:1.7.0これは、Kafka.spec.kafka.versionプロパティーが指定され、KafkaリソースのKafka.spec.kafka.imageは指定されていない場合に使用されます。 STRIMZI_DEFAULT_KAFKA_INIT_IMAGE-
任意設定で、デフォルトは
registry.redhat.io/amq7/amq-streams-rhel7-operator:1.7.0です。Kafkaリソースでkafka-init-imageとして指定されたイメージがない場合に、初期設定作業 (ラックサポート) のブローカーの前に開始される init コンテナーのデフォルトとして使用するイメージ名。 STRIMZI_KAFKA_CONNECT_IMAGES-
必須。Kafka バージョンから、そのバージョンの Kafka Connect が含まれる該当の Docker イメージへのマッピングが提供されます。必要な構文は、空白またはカンマ区切りの
<version>=<image>ペアです。例:2.6.0=registry.redhat.io/amq7/amq-streams-kafka-26-rhel7:1.7.0, 2.7.0=registry.redhat.io/amq7/amq-streams-kafka-27-rhel7:1.7.0これは、KafkaConnect.spec.versionプロパティーが指定され、KafkaConnect.spec.imageは指定されていない場合に使用されます。 STRIMZI_KAFKA_CONNECT_S2I_IMAGES-
必須。Kafka バージョンから、そのバージョンの Kafka Connect が含まれる該当の Docker イメージへのマッピングが提供されます。必要な構文は、空白またはカンマ区切りの
<version>=<image>ペアです。例:2.6.0=registry.redhat.io/amq7/amq-streams-kafka-26-rhel7:1.7.0, 2.7.0=registry.redhat.io/amq7/amq-streams-kafka-27-rhel7:1.7.0これは、KafkaConnectS2I.spec.versionプロパティーが指定され、KafkaConnectS2I.spec.imageは指定されていない場合に使用されます。 STRIMZI_KAFKA_MIRROR_MAKER_IMAGES-
必須。Kafka バージョンから、そのバージョンの Kafka Mirror Maker が含まれる該当の Docker イメージへのマッピングが提供されます。必要な構文は、空白またはカンマ区切りの
<version>=<image>ペアです。例:2.6.0=registry.redhat.io/amq7/amq-streams-kafka-26-rhel7:1.7.0, 2.7.0=registry.redhat.io/amq7/amq-streams-kafka-27-rhel7:1.7.0これは、KafkaMirrorMaker.spec.versionプロパティーが指定され、KafkaMirrorMaker.spec.imageは指定されていない場合に使用されます。 STRIMZI_DEFAULT_TOPIC_OPERATOR_IMAGE-
任意設定で、デフォルトは
registry.redhat.io/amq7/amq-streams-rhel7-operator:1.7.0です。KafkaリソースにKafka.spec.entityOperator.topicOperator.imageとして指定されたイメージがない場合に、Topic Operator のデプロイ時にデフォルトとして使用するイメージ名。 STRIMZI_DEFAULT_USER_OPERATOR_IMAGE-
任意設定で、デフォルトは
registry.redhat.io/amq7/amq-streams-rhel7-operator:1.7.0です。KafkaリソースにKafka.spec.entityOperator.userOperator.imageとして指定されたイメージがない場合に、User Operator のデプロイ時にデフォルトとして使用するイメージ名。 STRIMZI_DEFAULT_TLS_SIDECAR_ENTITY_OPERATOR_IMAGE-
任意設定で、デフォルトは
registry.redhat.io/amq7/amq-streams-kafka-27-rhel7:1.7.0です。KafkaリソースでKafka.spec.entityOperator.tlsSidecar.imageとして指定されたイメージがない場合に、Entity Operator の TLS サポートを提供するサイドカーコンテナーをデプロイする際にデフォルトとして使用するイメージ名。 STRIMZI_IMAGE_PULL_POLICY-
任意設定。AMQ Streams の Cluster Operator によって管理されるすべての Pod のコンテナーに適用される
ImagePullPolicy。有効な値は、Always、IfNotPresent、およびNeverです。指定のない場合、OpenShift のデフォルトが使用されます。ポリシーを変更すると、すべての Kafka、Kafka Connect、および Kafka MirrorMaker クラスターのローリングアップデートが実行されます。 STRIMZI_IMAGE_PULL_SECRETS-
任意設定。
Secret名のカンマ区切りのリスト。ここで参照されるシークレットには、コンテナーイメージがプルされるコンテナーレジストリーへのクレデンシャルが含まれます。シークレットは、Cluster Operator によって作成されるすべてのPodsのimagePullSecretsフィールドで使用されます。このリストを変更すると、Kafka、Kafka Connect、および Kafka MirrorMaker のすべてのクラスターのローリングアップデートが実行されます。 STRIMZI_KUBERNETES_VERSION任意設定。API サーバーから検出された Kubernetes バージョン情報をオーバーライドします。
Kubernetes バージョンオーバーライドの設定例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow KUBERNETES_SERVICE_DNS_DOMAIN任意設定。デフォルトの OpenShift DNS サフィックスを上書きします。
デフォルトでは、OpenShfit クラスターで割り当てられるサービスに、デフォルトのサフィックス
cluster.localを使用する DNS ドメイン名があります。ブローカーが kafka-0 の場合の例は次のとおりです。
<cluster-name>-kafka-0.<cluster-name>-kafka-brokers.<namespace>.svc.cluster.local
<cluster-name>-kafka-0.<cluster-name>-kafka-brokers.<namespace>.svc.cluster.localCopy to Clipboard Copied! Toggle word wrap Toggle overflow DNS ドメイン名は、ホスト名の検証に使用される Kafka ブローカー証明書に追加されます。
クラスターで異なる DNS サフィックスを使用している場合、Kafka ブローカーとの接続を確立するために、
KUBERNETES_SERVICE_DNS_DOMAIN環境変数をデフォルトから現在使用中の DNS サフィックスに変更します。STRIMZI_CONNECT_BUILD_TIMEOUT_MS- 任意設定、デフォルトは 300000 ミリ秒です。追加のコネクターで新しい Kafka Connect イメージをビルドする場合のタイムアウト (ミリ秒単位)。AMQ Streams を使用して多くのコネクターが含まれるコンテナーイメージをビルドしたり、低速なコンテナーレジストリーを使用する場合は、この値を大きくする必要があります。
5.1.1.1. ConfigMap による設定のロギング リンクのコピーリンクがクリップボードにコピーされました!
Cluster Operator のロギングは strimzi-cluster-operator ConfigMap によって設定されます。
ロギング設定を含む ConfigMap は、Cluster Operator のインストール時に作成されます。この ConfigMap は、ファイル install/cluster-operator/050-ConfigMap-strimzi-cluster-operator.yaml に記述されています。Cluster Operator のロギングを設定するには、この ConfigMap のデータフィールド log4j2.properties を変更します。
ロギング設定を更新するには、050-ConfigMap-strimzi-cluster-operator.yaml ファイルを編集して以下のコマンドを実行します。
oc create -f install/cluster-operator/050-ConfigMap-strimzi-cluster-operator.yaml
oc create -f install/cluster-operator/050-ConfigMap-strimzi-cluster-operator.yaml
ConfigMap を直接編集することもできます。
oc edit cm strimzi-cluster-operator
oc edit cm strimzi-cluster-operator
リロード間隔の頻度を変更するには、作成された ConfigMap の monitorInterval オプションで、秒単位の時間を設定します。
Cluster Operator のデプロイ時に ConfigMap がない場合、デフォルトのロギング値が使用されます。
Cluster Operator のデプロイ後に ConfigMap が誤って削除された場合、最後にロードされたロギング設定が使用されます。ConfigMap を新たに作成し、新しいロギング設定を読み込みます。
ConfigMap から monitorInterval オプションを削除しないでください。
5.1.1.2. ネットワークポリシーによる Cluster Operator アクセスの制限 リンクのコピーリンクがクリップボードにコピーされました!
Cluster Operator は、管理するリソースと同じ namespace または別の namespace で実行できます。デフォルトでは、STRIMZI_OPERATOR_NAMESPACE 環境変数は、OpenShift Downward API を使用して Cluster Operator が稼働している namespace を検索するように設定されます。Cluster Operator がリソースと同じ namespace で実行されている場合は、ローカルアクセスのみが必要で、AMQ Sreams によって許可されます。
Cluster Operator が管理するリソースとは別の namespace で実行されている場合、ネットワークポリシーが設定されている場合を除き、OpenShift クラスターのすべての namespace は Cluster Operator へのアクセスが許可されます。任意の STRIMZI_OPERATOR_NAMESPACE_LABELS 環境変数を使用し、namespace ラベルを使用して Cluster Operator のネットワークポリシーを確立します。namespace ラベルを追加すると、Cluster Operator へのアクセスは指定された namespace に限定されます。
Cluster Operator デプロイメントに設定されたネットワークポリシー
5.1.1.3. 定期的な調整 リンクのコピーリンクがクリップボードにコピーされました!
Cluster Operator は OpenShift クラスターから受信する必要なクラスターリソースに関するすべての通知に対応しますが、Operator が実行されていない場合や、何らかの理由で通知が受信されない場合、必要なリソースは実行中の OpenShift クラスターの状態と同期しなくなります。
フェイルオーバーを適切に処理するために、Cluster Operator によって定期的な調整プロセスが実行され、必要なリソースすべてで一貫した状態になるように、必要なリソースの状態を現在のクラスターデプロイメントと比較できます。[STRIMZI_FULL_RECONCILIATION_INTERVAL_MS] 変数を使用して、定期的な調整の時間間隔を設定できます。
5.1.2. ロールベースアクセス制御 (RBAC) のプロビジョニング リンクのコピーリンクがクリップボードにコピーされました!
Cluster Operator が機能するには、Kafka、KafkaConnect などのリソースや ConfigMaps、Pods、Deployments、StatefulSets、および Services の管理リソースと対話するために OpenShift クラスター内でパーミッションが必要になり ます。このようなパーミッションは、OpenShift のロールベースアクセス制御 (RBAC) リソースに記述されます。
-
ServiceAccount -
RoleおよびClusterRole -
RoleBindingおよびClusterRoleBinding
Cluster Operator は、ClusterRoleBinding を使用して独自の ServiceAccount で実行される他に、OpenShift リソースへのアクセスを必要とするコンポーネントの RBAC リソースを管理します。
また OpenShift には、ServiceAccount で動作するコンポーネントが、その ServiceAccount にはない他の ServiceAccounts の権限を付与しないようにするための特権昇格の保護機能も含まれています。Cluster Operator は、ClusterRoleBindings と、それが管理するリソースで必要な RoleBindings を作成できる必要があるため、Cluster Operator にも同じ権限が必要です。
5.1.2.1. 委譲された権限 リンクのコピーリンクがクリップボードにコピーされました!
Cluster Operator が必要な Kafka リソースのリソースをデプロイする場合、以下のように ServiceAccounts、RoleBindings、および ClusterRoleBindings も作成します。
Kafka ブローカー Pod は
cluster-name-kafkaというServiceAccountを使用します。-
ラック機能が使用されると、
strimzi-cluster-name-kafka-initClusterRoleBindingは、strimzi-kafka-brokerと呼ばれるClusterRole経由で、クラスター内のノードへのServiceAccountアクセスを付与するために使用されます。 - ラック機能が使用されていない場合は、バインディングは作成されません。
-
ラック機能が使用されると、
-
ZooKeeper Pod は
cluster-name-zookeeperというServiceAccountを使用します。 Entity Operator は、
cluster-name-entity-operatorというServiceAccountを使用します。-
Topic Operator はステータス情報のある OpenShift イベントを生成し、
ServiceAccountがstrimzi-entity-operatorというClusterRoleにバインドされるようにします。strimzi-entity-operatorはこのアクセス権限をstrimzi-entity-operatorRoleBinding経由で付与します。
-
Topic Operator はステータス情報のある OpenShift イベントを生成し、
-
KafkaConnectおよびKafkaConnectS2Iリソースの Pod はcluster-name-cluster-connectというServiceAccountを使用します。 -
KafkaMirrorMakerの Pod はcluster-name-mirror-makerというServiceAccountを使用します。 -
KafkaMirrorMaker2の Pod はcluster-name-mirrormaker2というServiceAccountを使用します。 -
KafkaBridgeの Pod はcluster-name-bridgeというServiceAccountを使用します。
5.1.2.2. ServiceAccount リンクのコピーリンクがクリップボードにコピーされました!
Cluster Operator は ServiceAccount を使用して最適に実行されます。
Cluster Operator の ServiceAccount の例
その後、Cluster Operator の Deployment で、これを spec.template.spec.serviceAccountName に指定する必要があります。
Cluster Operator の Deployment の部分的な例
strimzi-cluster-operator ServiceAccount が serviceAccountName として指定されている 12 行目に注目してください。
5.1.2.3. ClusterRoles リンクのコピーリンクがクリップボードにコピーされました!
Cluster Operator は、必要なリソースへのアクセス権限を付与する ClusterRoles を使用して操作する必要があります。OpenShift クラスターの設定によっては、クラスター管理者が ClusterRoles を作成する必要があることがあります。
クラスター管理者の権限は ClusterRoles の作成にのみ必要です。Cluster Operator はクラスター管理者アカウントで実行されません。
ClusterRoles は、 最小権限の原則に従い、Kafka、Kafka Connect、および ZooKeeper クラスターを操作するために Cluster Operator が必要とする権限のみが含まれます。最初に割り当てられた一連の権限により、Cluster Operator で StatefulSets、Deployments、Pods、および ConfigMaps などの OpenShift リソースを管理できます。
Cluster Operator は ClusterRoles を使用して、namespace スコープリソースのレベルおよびクラスタースコープリソースのレベルで権限を付与します。
Cluster Operator の namespaced リソースのある ClusterRole
2 番目の一連の権限には、クラスタースコープリソースに必要な権限が含まれます。
Cluster Operator のクラスタースコープリソースのある ClusterRole
strimzi-kafka-broker ClusterRole は、ラック機能に使用される Kafka Pod の init コンテナーが必要とするアクセス権限を表します。「委譲された権限」 で説明したように、このアクセスを委譲できるようにするには、このロールも Cluster Operator に必要です。
Cluster Operator の ClusterRole により、OpenShift ノードへのアクセスを Kafka ブローカー Pod に委譲できます。
strimzi-topic-operator の ClusterRole は、Topic Operator が必要とするアクセスを表します。「委譲された権限」 で説明したように、このアクセスを委譲できるようにするには、このロールも Cluster Operator に必要です。
Cluster Operator の ClusterRole により、イベントへのアクセスを Topic Operator に委譲できます。
strimzi-kafka-client ClusterRole は、クライアントのラックアウェアネスを使用する Kafka クライアントをベースとするコンポーネントが必要なアクセスを表します。「委譲された権限」 で説明したように、このアクセスを委譲できるようにするには、このロールも Cluster Operator に必要です。
Cluster Operator の ClusterRole により、OpenShift ノードへのアクセスを Kafka クライアントベースの Pod に委譲できます。
5.1.2.4. ClusterRoleBindings リンクのコピーリンクがクリップボードにコピーされました!
Operator には ClusterRoleBindings と、ClusterRole をServiceAccount に関連付ける RoleBindings が必要です。ClusterRoleBindings は、クラスタースコープリロースが含まれる ClusterRoles に必要です。
Cluster Operator の ClusterRoleBinding の例
ClusterRoleBindings は、委譲に必要な ClusterRoles にも必要です。
Kafka ブローカーラックアウェアネスの Cluster Operator の ClusterRoleBinding 例
以下も必要です。
Kafka クライアントラックアウェアネスの Cluster Operator の ClusterRoleBinding 例
namespaced リソースのみが含まれる ClusterRoles は、RoleBindings のみを使用してバインドされます。
5.2. Topic Operator の使用 リンクのコピーリンクがクリップボードにコピーされました!
KafkaTopic リソースを使用してトピックを作成、編集、または削除する場合、Topic Operator によって変更が確実に Kafka クラスターで反映されます。
『OpenShift での AMQ Streams のデプロイおよびアップグレード』には、Topic Operator をデプロイする手順が記載されています。
5.2.1. Kafka トピックリソース リンクのコピーリンクがクリップボードにコピーされました!
KafkaTopic リソースは、パーティションやレプリカの数を含む、トピックの設定に使用されます。
KafkaTopic の完全なスキーマは、「KafkaTopic スキーマ参照」で確認できます。
5.2.1.1. トピック処理用の Kafka クラスターの特定 リンクのコピーリンクがクリップボードにコピーされました!
KafkaTopic リソースには、このリソースが属する Kafka クラスターに適した名前 (Kafka リソースの名前から派生) を定義するラベルが含まれています。
以下に例を示します。
ラベルは、KafkaTopic リソースを特定し、新しいトピックを作成するために、Topic Operator によって使用されます。また、以降のトピックの処理でも使用されます。
ラベルが Kafka クラスターと一致しない場合、Topic Operator は KafkaTopic を識別できず、トピックは作成されません。
5.2.1.2. Kafka トピックの使用に関する推奨事項 リンクのコピーリンクがクリップボードにコピーされました!
トピックを使用する場合は、整合性を保ちます。常に KafkaTopic リソースで作業を行うか、直接 OpenShift でトピックを扱います。特定のトピックで、両方の方法を頻繁に切り替えないでください。
トピックの性質を反映するトピック名を使用し、後で名前を変更できないことに注意してください。
Kafka でトピックを作成する場合は、有効な OpenShift リソース名である名前を使用します。それ以外の場合は、Topic Operator は対応する KafkaTopic を OpenShift ルールに準じた名前で作成する必要があります。
OpenShift の識別子および名前の推奨事項については、OpenShift コミュニティーの記事「Identifiers and Names」を参照してください。
5.2.1.3. Kafka トピックの命名規則 リンクのコピーリンクがクリップボードにコピーされました!
Kafka と OpenShift では、Kafka と KafkaTopic.metadata.name でのトピックの命名にそれぞれ独自の検証ルールを適用します。トピックごとに有効な名前があり、他のトピックには無効です。
spec.topicName プロパティーを使用すると、OpenShift の Kafka トピックでは無効な名前を使用して、Kafka で有効なトピックを作成できます。
spec.topicName プロパティーは Kafka の命名検証ルールを継承します。
- 249 文字を超える名前は使用できません。
-
Kafka トピックの有効な文字は ASCII 英数字、
.、_、および-です。 -
名前を
.または..にすることはできませんが、.はexampleTopic.や.exampleTopicのように名前で使用できます。
spec.topicName は変更しないでください。
以下に例を示します。
- 1
- OpenShift では大文字は無効です。
上記は下記のように変更できません。
Kafka Streams など一部の Kafka クライアントアプリケーションは、プログラムを使用して Kafka でトピックを作成できます。これらのトピックに、OpenShift リソース名としては無効な名前が付いている場合、Topic Operator はそれらのトピックに Kafka 名に基づく有効な metadata.name を付けます。無効な文字が置き換えられ、ハッシュが名前に追加されます。以下に例を示します。
5.2.2. Topic Operator のトピックストア リンクのコピーリンクがクリップボードにコピーされました!
Topic Operator は Kafka を使用して、トピック設定をキーと値のペアとして記述するトピックメタデータを保存します。トピックストアは、Kafka トピックを使用して状態を永続化する Kafka Streams のキーバリューメカニズムを基にしています。
トピックメタデータはインメモリーでキャッシュされ、Topic Operator 内にてローカルでアクセスされます。ローカルのインメモリーキャッシュに適用される操作からの更新は、ディスク上のバックアップトピックストアに永続化されます。トピックストアは、Kafka トピックまたは OpenShift KafkaTopic カスタムリソースからの更新と継続的に同期されます。操作は、このような方法で設定されたトピックストアで迅速に処理されますが、インメモリーキャッシュがクラッシュした場合は、永続ストレージから自動的にデータが再入力されます。
5.2.2.1. 内部トピックストアトピック リンクのコピーリンクがクリップボードにコピーされました!
内部トピックは、トピックストアでのトピックメタデータの処理をサポートします。
__strimzi_store_topic- トピックメタデータを保存するための入力トピック
__strimzi-topic-operator-kstreams-topic-store-changelog- 圧縮されたトピックストア値のログの維持
これらのトピックは、Topic Operator の実行に不可欠であるため、削除しないでください。
5.2.2.2. ZooKeeper からのトピックメタデータの移行 リンクのコピーリンクがクリップボードにコピーされました!
これまでのリリースの AMQ Streams では、トピックメタデータは ZooKeeper に保存されていました。新しいプロセスによってこの要件は除外されたため、メタデータは Kafka クラスターに取り込まれ、Topic Operator の制御下となります。
AMQ Streams 1.7 にアップグレードする場合、Topic Operator によってトピックストアが制御されるようにシームレスに移行されます。メタデータは ZooKeeper から検出および移行され、古いストアは削除されます。
5.2.2.3. ZooKeeper を使用してトピックメタデータを保存する AMQ Streams バージョンへのダウングレード リンクのコピーリンクがクリップボードにコピーされました!
トピックメタデータの保存に ZooKeeper を使用する 0.22 より前のバージョンの AMQ Streams に戻す場合でも、Cluster Operator を前のバージョンにダウングレードしてから、Kafka ブローカーおよびクライアントアプリケーションを前の Kafka バージョンにダウングレードします。
ただし、kafka-admin コマンドを使用して Kafka クラスターのブートストラップアドレスを指定し、トピックストア用に作成されたトピックを削除する必要もあります。以下に例を示します。
oc run kafka-admin -ti --image=registry.redhat.io/amq7/amq-streams-kafka-27-rhel7:1.7.0 --rm=true --restart=Never -- ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic __strimzi-topic-operator-kstreams-topic-store-changelog --delete && ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic __strimzi_store_topic --delete
oc run kafka-admin -ti --image=registry.redhat.io/amq7/amq-streams-kafka-27-rhel7:1.7.0 --rm=true --restart=Never -- ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic __strimzi-topic-operator-kstreams-topic-store-changelog --delete && ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic __strimzi_store_topic --delete
このコマンドは、Kafka クラスターへのアクセスに使用されるリスナーおよび認証のタイプに対応している必要があります。
Topic Operator は、Kafka のトピックの状態から ZooKeeper トピックメタデータを再構築します。
5.2.2.4. Topic Operator トピックのレプリケーションおよびスケーリング リンクのコピーリンクがクリップボードにコピーされました!
Topic Operator によって管理されるトピックには、トピックレプリケーション係数を 3 に設定し、最低でも 2 つの In-Sync レプリカを設定することが推奨されます。
In-Sync レプリカは、プロデューサーアプリケーションの acks 設定とともに使用されます。acks 設定は、メッセージの正常受信が確認される前に、メッセージがレプリケートされなければならないフォロワーパーティションの数を決定します。Topic Operator は acks=all で実行されます。すべての In-Sync レプリカによってメッセージが確認される必要があります。
ブローカーを追加または削除して Kafka クラスターをスケーリングする場合、レプリケーション係数設定は変更されず、レプリカは自動的に再割り当てされません。ただし、kafka-reassign-partitions.sh ツールを使用してレプリケーション係数を変更し、レプリカをブローカーに手動で再割り当てすることができます。
また、AMQ Streams の Cruise Control の統合ではトピックのレプリケーション係数を変更することはできませんが、Kafka をリバランスするために生成された最適化プロポーザルには、パーティションレプリカを転送し、パーティションリーダーを変更するコマンドが含まれます。
5.2.2.5. トピック変更の処理 リンクのコピーリンクがクリップボードにコピーされました!
Topic Operator にとって解決しなければならない基本的な問題として、信頼できる唯一の情報源 (SSOT: single source of truth) がないことがあります。KafkaTopic リソースと Kafka トピックの両方とも、Topic Operator に関係なく変更される可能性があります面倒なことに、Topic Operator は KafkaTopic リソースと Kafka トピックで変更を常にリアルタイムで監視できるとは限りません。たとえば、Topic Operator が停止した場合などがこれに該当します。
これを解決するために、Topic Operator はトピックストアの各トピックに関する情報を維持します。Kafka クラスターまたは OpenShift で変更が生じると、他のシステムの状態とトピックストアの両方を確認し、すべての同期が保たれるように何を変更する必要があるかを判断します。同じことが Topic Operator の起動時に必ず実行され、また Topic Operator の稼働中にも定期的に行われます。
たとえば、Topic Operator が実行されていないときに my-topic という KafkaTopic が作成された場合を考えてみましょう。Topic Operator の起動時、トピックストアには my-topic に関する情報が含まれないため、最後に実行された後に KafkaTopic が作成されたと推測できます。Topic Operator によって my-topic に対応するトピックが作成され、さらにトピックストアに my-topic のメタデータも格納されます。
Kafka トピック設定を更新するか、KafkaTopic カスタムリソースで変更を適用する場合、Kafka クラスターの調整後にトピックストアが更新されます。
また、トピックストアは Kafka トピックでトピック設定が変更され、かつ OpenShift KafkaTopic カスタムリソースで更新される場合に、変更が矛盾しない限り、Topic Operator による管理を可能にします。たとえば、同じトピック設定キーに変更を加えることはできますが、別の値への変更のみが可能です。変更に矛盾がある場合、Kafka の設定が優先され、KafkaTopic はそれに応じて更新されます。
KafkaTopic リソースを使用すると、oc delete -f KAFKA-TOPIC-CONFIG-FILE コマンドを使用してトピックを削除することもできます。これを可能にするには、Kafka リソースの spec.kafka.config で delete.topic.enable を true (デフォルト) に設定する必要があります。
5.2.3. Kafka トピックの設定 リンクのコピーリンクがクリップボードにコピーされました!
KafkaTopic リソースのプロパティーを使用して Kafka トピックを設定します。
oc apply を使用すると、トピックを作成または編集できます。oc delete を使用すると、既存のトピックを削除できます。
以下に例を示します。
-
oc apply -f <topic-config-file> -
oc delete KafkaTopic <topic-name>
この手順では、10 個のパーティションと 2 つのレプリカがあるトピックを作成する方法を説明します。
作業を開始する前の注意事項
以下を考慮してから変更を行うことが重要になります。
Kafka は
KafkaTopicリソースによる以下の変更をサポートしません。-
spec.topicNameを使用したトピック名の変更 -
spec.partitionsを使用したパーティションサイズの減少
-
-
spec.replicasを使用して最初に指定したレプリカの数を変更することはできません。 -
キーのあるトピックの
spec.partitionsを増やすと、レコードをパーティション化する方法が変更されます。これは、トピックがセマンティックパーティションを使用するとき、特に問題になる場合があります。
前提条件
- TLS 認証および暗号化を使用してリスナーで設定された 稼働中の Kafka クラスターが必要です。
- 稼働中の Topic Operator が必要です (通常は Entity Operator でデプロイされます)。
-
トピックを削除する場合は、
Kafkaリソースのspec.kafka.configがdelete.topic.enable=true(デフォルト) である必要があります。
手順
作成する
KafkaTopicが含まれるファイルを準備します。KafkaTopicの例Copy to Clipboard Copied! Toggle word wrap Toggle overflow ヒントトピックを変更する場合、現行バージョンのリソースは、
oc get kafkatopic orders -o yamlを使用して取得できます。OpenShift で
KafkaTopicリソースを作成します。oc apply -f TOPIC-CONFIG-FILE
oc apply -f TOPIC-CONFIG-FILECopy to Clipboard Copied! Toggle word wrap Toggle overflow
5.2.4. リソース要求および制限のある Topic Operator の設定 リンクのコピーリンクがクリップボードにコピーされました!
CPU やメモリーなどのリソースを Topic Operator に割り当て、Topic Operator が消費できるリソースの量に制限を設定できます。
前提条件
- Cluster Operator が稼働している必要があります。
手順
必要に応じてエディターで Kafka クラスター設定を更新します。
oc edit kafka MY-CLUSTER
oc edit kafka MY-CLUSTERCopy to Clipboard Copied! Toggle word wrap Toggle overflow Kafkaリソースのspec.entityOperator.topicOperator.resourcesプロパティーで、Topic Operator のリソース要求および制限を設定します。Copy to Clipboard Copied! Toggle word wrap Toggle overflow 新しい設定を適用してリソースを作成または更新します。
oc apply -f KAFKA-CONFIG-FILE
oc apply -f KAFKA-CONFIG-FILECopy to Clipboard Copied! Toggle word wrap Toggle overflow
5.3. User Operator の使用 リンクのコピーリンクがクリップボードにコピーされました!
KafkaUser リソースを使用してユーザーを作成、編集、または削除する場合、User Operator によって変更が確実に Kafka クラスターで反映されます。
『OpenShift での AMQ Streams のデプロイおよびアップグレード』には、User Operator をデプロイする手順が記載されています。
スキーマの詳細は、「KafkaUser スキーマ参照」を参照してください。
Kafka へのアクセスの認証および承認
KafkaUser を使用して、Kafka にアクセスするために特定のクライアントが使用する認証および承認メカニズムを有効にします。
KafkUser を使用してユーザーを管理し、Kafka ブローカーへのアクセスをセキュアにする方法の詳細は、「Kafka ブローカーへのアクセスのセキュア化」を参照してください。
5.3.1. リソース要求および制限のある User Operator の設定 リンクのコピーリンクがクリップボードにコピーされました!
CPU やメモリーなどのリソースを User Operator に割り当て、User Operator が消費できるリソースの量に制限を設定できます。
前提条件
- Cluster Operator が稼働している必要があります。
手順
必要に応じてエディターで Kafka クラスター設定を更新します。
oc edit kafka MY-CLUSTER
oc edit kafka MY-CLUSTERCopy to Clipboard Copied! Toggle word wrap Toggle overflow Kafkaリソースのspec.entityOperator.userOperator.resourcesプロパティーで、User Operator のリソース要求および制限を設定します。Copy to Clipboard Copied! Toggle word wrap Toggle overflow ファイルを保存し、エディターを終了します。Cluster Operator によって変更が自動的に適用されます。
5.4. Prometheus メトリクスを使用した Operator の監視 リンクのコピーリンクがクリップボードにコピーされました!
AMQ Streams の operator は Prometheus メトリクスを公開します。メトリクスは自動で有効になり、以下の情報が含まれます。
- 調整の数
- operator が処理しているカスタムリソースの数
- 調整の期間
- operator からの JVM メトリクス
この他に、Grafana ダッシュボードのサンプルが提供されます。
詳細は、『OpenShift での AMQ Streams のデプロイおよびアップグレード』の「AMQ Streams のメトリクスおよびダッシュボードの設定」を参照してください。