第5章 AMQ Streams operator の使用
AMQ Streams の operator を使用して Kafka クラスターと Kafka トピックおよびユーザーを管理します。
5.1. Cluster Operator の使用
Cluster Operator は Kafka クラスターや他の Kafka コンポーネントをデプロイするために使用されます。
Cluster Operator は YAML インストールファイルを使用してデプロイされます。
OpenShift では、Kafka Connect デプロイメントに Source2Image 機能を組み込み、追加のコネクターを加えるための便利な方法として利用できます。
その他のリソース
- 『OpenShift での AMQ Streams のデプロイおよびアップグレード』の「Cluster Operator のデプロイ」
- 「Kafka クラスターの設定」
5.1.1. Cluster Operator の設定
Cluster Operator は、サポートされる環境変数を使用してロギング設定から設定できます。
環境変数は、Cluster Operator イメージのデプロイメンのコンテナー設定に関連します。image
設定の詳細は、「image
」 を参照してください。
STRIMZI_NAMESPACE
Operator が操作する namespace のカンマ区切りのリスト。設定されていない場合や、空の文字列や
*
に設定された場合は、Cluster Operator はすべての namespace で操作します。Cluster Operator デプロイメントでは OpenShift Downward API を使用して、これを Cluster Operator がデプロイされる namespace に自動設定することがあります。Cluster Operator namespace の設定例
env: - name: STRIMZI_NAMESPACE valueFrom: fieldRef: fieldPath: metadata.namespace
-
STRIMZI_FULL_RECONCILIATION_INTERVAL_MS
- 任意設定、デフォルトは 120000 ミリ秒です。定期的な調整の間隔 (秒単位)。
STRIMZI_OPERATION_TIMEOUT_MS
- 任意設定、デフォルトは 300000 ミリ秒です。内部操作のタイムアウト (ミリ秒単位)。この値は、標準の OpenShift 操作の時間が通常よりも長いクラスターで (Docker イメージのダウンロードが遅い場合など) AMQ Streams を使用する場合に増やす必要があります。
STRIMZI_OPERATOR_NAMESPACE
AMQ Streams Cluster Operator が稼働している namespace の名前。この変数は手動で設定しないでください。OpenShift Downward API を使用します。
env: - name: STRIMZI_OPERATOR_NAMESPACE valueFrom: fieldRef: fieldPath: metadata.namespace
STRIMZI_OPERATOR_NAMESPACE_LABELS
任意設定。AMQ Streams Cluster Operator が稼働している namespace のラベル。namespace ラベルは、ネットワークポリシーで namespace セレクターを設定するために使用されます。これにより、AMQ Streams Cluster Operator はこれらのラベルを持つ namespace からのオペランドのみにアクセスできます。設定されていない場合、ネットワークポリシーの namespace セレクターは、OpenShift クラスターのすべての namespace から AMQ Streams Cluster Operator にアクセスできるように設定されます。
env: - name: STRIMZI_OPERATOR_NAMESPACE_LABELS value: label1=value1,label2=value2
STRIMZI_CUSTOM_RESOURCE_SELECTOR
任意設定。Operator によって処理されるカスタムリソースのフィルタリングに使用されるラベルセレクターを指定します。Operator は、指定されたラベルが設定されているカスタムリソースでのみ動作します。これらのラベルのないリソースは Operator によって認識されません。ラベルセレクターは、
Kafka
、KafkaConnect
、KafkaConnectS2I
、KafkaBridge
、KafkaMirrorMaker
、およびKafkaMirrorMaker2
リソースに適用されます。KafkaRebalance
およびKafkaConnector
リソースは、対応する Kafka および Kafka Connect クラスターに一致するラベルがある場合にのみ操作されます。env: - name: STRIMZI_CUSTOM_RESOURCE_SELECTOR value: label1=value1,label2=value2
STRIMZI_KAFKA_IMAGES
-
必須。Kafka バージョンから、そのバージョンの Kafka ブローカーが含まれる該当の Docker イメージへのマッピングが提供されます。必要な構文は、空白またはカンマ区切りの
<version>=<image>
ペアです。例:2.6.0=registry.redhat.io/amq7/amq-streams-kafka-26-rhel7:1.7.0, 2.7.0=registry.redhat.io/amq7/amq-streams-kafka-27-rhel7:1.7.0
これは、Kafka.spec.kafka.version
プロパティーが指定され、Kafka
リソースのKafka.spec.kafka.image
は指定されていない場合に使用されます。 STRIMZI_DEFAULT_KAFKA_INIT_IMAGE
-
任意設定で、デフォルトは
registry.redhat.io/amq7/amq-streams-rhel7-operator:1.7.0
です。Kafka
リソースでkafka-init-image
として指定されたイメージがない場合に、初期設定作業 (ラックサポート) のブローカーの前に開始される init コンテナーのデフォルトとして使用するイメージ名。 STRIMZI_KAFKA_CONNECT_IMAGES
-
必須。Kafka バージョンから、そのバージョンの Kafka Connect が含まれる該当の Docker イメージへのマッピングが提供されます。必要な構文は、空白またはカンマ区切りの
<version>=<image>
ペアです。例:2.6.0=registry.redhat.io/amq7/amq-streams-kafka-26-rhel7:1.7.0, 2.7.0=registry.redhat.io/amq7/amq-streams-kafka-27-rhel7:1.7.0
これは、KafkaConnect.spec.version
プロパティーが指定され、KafkaConnect.spec.image
は指定されていない場合に使用されます。 STRIMZI_KAFKA_CONNECT_S2I_IMAGES
-
必須。Kafka バージョンから、そのバージョンの Kafka Connect が含まれる該当の Docker イメージへのマッピングが提供されます。必要な構文は、空白またはカンマ区切りの
<version>=<image>
ペアです。例:2.6.0=registry.redhat.io/amq7/amq-streams-kafka-26-rhel7:1.7.0, 2.7.0=registry.redhat.io/amq7/amq-streams-kafka-27-rhel7:1.7.0
これは、KafkaConnectS2I.spec.version
プロパティーが指定され、KafkaConnectS2I.spec.image
は指定されていない場合に使用されます。 STRIMZI_KAFKA_MIRROR_MAKER_IMAGES
-
必須。Kafka バージョンから、そのバージョンの Kafka Mirror Maker が含まれる該当の Docker イメージへのマッピングが提供されます。必要な構文は、空白またはカンマ区切りの
<version>=<image>
ペアです。例:2.6.0=registry.redhat.io/amq7/amq-streams-kafka-26-rhel7:1.7.0, 2.7.0=registry.redhat.io/amq7/amq-streams-kafka-27-rhel7:1.7.0
これは、KafkaMirrorMaker.spec.version
プロパティーが指定され、KafkaMirrorMaker.spec.image
は指定されていない場合に使用されます。 STRIMZI_DEFAULT_TOPIC_OPERATOR_IMAGE
-
任意設定で、デフォルトは
registry.redhat.io/amq7/amq-streams-rhel7-operator:1.7.0
です。Kafka
リソースにKafka.spec.entityOperator.topicOperator.image
として指定されたイメージがない場合に、Topic Operator のデプロイ時にデフォルトとして使用するイメージ名。 STRIMZI_DEFAULT_USER_OPERATOR_IMAGE
-
任意設定で、デフォルトは
registry.redhat.io/amq7/amq-streams-rhel7-operator:1.7.0
です。Kafka
リソースにKafka.spec.entityOperator.userOperator.image
として指定されたイメージがない場合に、User Operator のデプロイ時にデフォルトとして使用するイメージ名。 STRIMZI_DEFAULT_TLS_SIDECAR_ENTITY_OPERATOR_IMAGE
-
任意設定で、デフォルトは
registry.redhat.io/amq7/amq-streams-kafka-27-rhel7:1.7.0
です。Kafka
リソースでKafka.spec.entityOperator.tlsSidecar.image
として指定されたイメージがない場合に、Entity Operator の TLS サポートを提供するサイドカーコンテナーをデプロイする際にデフォルトとして使用するイメージ名。 STRIMZI_IMAGE_PULL_POLICY
-
任意設定。AMQ Streams の Cluster Operator によって管理されるすべての Pod のコンテナーに適用される
ImagePullPolicy
。有効な値は、Always
、IfNotPresent
、およびNever
です。指定のない場合、OpenShift のデフォルトが使用されます。ポリシーを変更すると、すべての Kafka、Kafka Connect、および Kafka MirrorMaker クラスターのローリングアップデートが実行されます。 STRIMZI_IMAGE_PULL_SECRETS
-
任意設定。
Secret
名のカンマ区切りのリスト。ここで参照されるシークレットには、コンテナーイメージがプルされるコンテナーレジストリーへのクレデンシャルが含まれます。シークレットは、Cluster Operator によって作成されるすべてのPods
のimagePullSecrets
フィールドで使用されます。このリストを変更すると、Kafka、Kafka Connect、および Kafka MirrorMaker のすべてのクラスターのローリングアップデートが実行されます。 STRIMZI_KUBERNETES_VERSION
任意設定。API サーバーから検出された Kubernetes バージョン情報をオーバーライドします。
Kubernetes バージョンオーバーライドの設定例
env: - name: STRIMZI_KUBERNETES_VERSION value: | major=1 minor=16 gitVersion=v1.16.2 gitCommit=c97fe5036ef3df2967d086711e6c0c405941e14b gitTreeState=clean buildDate=2019-10-15T19:09:08Z goVersion=go1.12.10 compiler=gc platform=linux/amd64
KUBERNETES_SERVICE_DNS_DOMAIN
任意設定。デフォルトの OpenShift DNS サフィックスを上書きします。
デフォルトでは、OpenShfit クラスターで割り当てられるサービスに、デフォルトのサフィックス
cluster.local
を使用する DNS ドメイン名があります。ブローカーが kafka-0 の場合の例は次のとおりです。
<cluster-name>-kafka-0.<cluster-name>-kafka-brokers.<namespace>.svc.cluster.local
DNS ドメイン名は、ホスト名の検証に使用される Kafka ブローカー証明書に追加されます。
クラスターで異なる DNS サフィックスを使用している場合、Kafka ブローカーとの接続を確立するために、
KUBERNETES_SERVICE_DNS_DOMAIN
環境変数をデフォルトから現在使用中の DNS サフィックスに変更します。STRIMZI_CONNECT_BUILD_TIMEOUT_MS
- 任意設定、デフォルトは 300000 ミリ秒です。追加のコネクターで新しい Kafka Connect イメージをビルドする場合のタイムアウト (ミリ秒単位)。AMQ Streams を使用して多くのコネクターが含まれるコンテナーイメージをビルドしたり、低速なコンテナーレジストリーを使用する場合は、この値を大きくする必要があります。
5.1.1.1. ConfigMap による設定のロギング
Cluster Operator のロギングは strimzi-cluster-operator
ConfigMap
によって設定されます。
ロギング設定を含む ConfigMap
は、Cluster Operator のインストール時に作成されます。この ConfigMap
は、ファイル install/cluster-operator/050-ConfigMap-strimzi-cluster-operator.yaml
に記述されています。Cluster Operator のロギングを設定するには、この ConfigMap
のデータフィールド log4j2.properties
を変更します。
ロギング設定を更新するには、050-ConfigMap-strimzi-cluster-operator.yaml
ファイルを編集して以下のコマンドを実行します。
oc create -f install/cluster-operator/050-ConfigMap-strimzi-cluster-operator.yaml
ConfigMap
を直接編集することもできます。
oc edit cm strimzi-cluster-operator
リロード間隔の頻度を変更するには、作成された ConfigMap
の monitorInterval
オプションで、秒単位の時間を設定します。
Cluster Operator のデプロイ時に ConfigMap
がない場合、デフォルトのロギング値が使用されます。
Cluster Operator のデプロイ後に ConfigMap
が誤って削除された場合、最後にロードされたロギング設定が使用されます。ConfigMap
を新たに作成し、新しいロギング設定を読み込みます。
ConfigMap から monitorInterval
オプションを削除しないでください。
5.1.1.2. ネットワークポリシーによる Cluster Operator アクセスの制限
Cluster Operator は、管理するリソースと同じ namespace または別の namespace で実行できます。デフォルトでは、STRIMZI_OPERATOR_NAMESPACE
環境変数は、OpenShift Downward API を使用して Cluster Operator が稼働している namespace を検索するように設定されます。Cluster Operator がリソースと同じ namespace で実行されている場合は、ローカルアクセスのみが必要で、AMQ Sreams によって許可されます。
Cluster Operator が管理するリソースとは別の namespace で実行されている場合、ネットワークポリシーが設定されている場合を除き、OpenShift クラスターのすべての namespace は Cluster Operator へのアクセスが許可されます。任意の STRIMZI_OPERATOR_NAMESPACE_LABELS
環境変数を使用し、namespace ラベルを使用して Cluster Operator のネットワークポリシーを確立します。namespace ラベルを追加すると、Cluster Operator へのアクセスは指定された namespace に限定されます。
Cluster Operator デプロイメントに設定されたネットワークポリシー
#... env: # ... - name: STRIMZI_OPERATOR_NAMESPACE_LABELS value: label1=value1,label2=value2 #...
5.1.1.3. 定期的な調整
Cluster Operator は OpenShift クラスターから受信する必要なクラスターリソースに関するすべての通知に対応しますが、Operator が実行されていない場合や、何らかの理由で通知が受信されない場合、必要なリソースは実行中の OpenShift クラスターの状態と同期しなくなります。
フェイルオーバーを適切に処理するために、Cluster Operator によって定期的な調整プロセスが実行され、必要なリソースすべてで一貫した状態になるように、必要なリソースの状態を現在のクラスターデプロイメントと比較できます。[STRIMZI_FULL_RECONCILIATION_INTERVAL_MS] 変数を使用して、定期的な調整の時間間隔を設定できます。
5.1.2. ロールベースアクセス制御 (RBAC) のプロビジョニング
Cluster Operator が機能するには、Kafka
、KafkaConnect
などのリソースや ConfigMaps
、Pods
、Deployments
、StatefulSets
、および Services
の管理リソースと対話するために OpenShift クラスター内でパーミッションが必要になり ます。このようなパーミッションは、OpenShift のロールベースアクセス制御 (RBAC) リソースに記述されます。
-
ServiceAccount
-
Role
およびClusterRole
-
RoleBinding
およびClusterRoleBinding
Cluster Operator は、ClusterRoleBinding
を使用して独自の ServiceAccount
で実行される他に、OpenShift リソースへのアクセスを必要とするコンポーネントの RBAC リソースを管理します。
また OpenShift には、ServiceAccount
で動作するコンポーネントが、その ServiceAccount
にはない他の ServiceAccounts
の権限を付与しないようにするための特権昇格の保護機能も含まれています。Cluster Operator は、ClusterRoleBindings
と、それが管理するリソースで必要な RoleBindings
を作成できる必要があるため、Cluster Operator にも同じ権限が必要です。
5.1.2.1. 委譲された権限
Cluster Operator が必要な Kafka
リソースのリソースをデプロイする場合、以下のように ServiceAccounts
、RoleBindings
、および ClusterRoleBindings
も作成します。
Kafka ブローカー Pod は
cluster-name-kafka
というServiceAccount
を使用します。-
ラック機能が使用されると、
strimzi-cluster-name-kafka-init
ClusterRoleBinding
は、strimzi-kafka-broker
と呼ばれるClusterRole
経由で、クラスター内のノードへのServiceAccount
アクセスを付与するために使用されます。 - ラック機能が使用されていない場合は、バインディングは作成されません。
-
ラック機能が使用されると、
-
ZooKeeper Pod は
cluster-name-zookeeper
というServiceAccount
を使用します。 Entity Operator は、
cluster-name-entity-operator
というServiceAccount
を使用します。-
Topic Operator はステータス情報のある OpenShift イベントを生成し、
ServiceAccount
がstrimzi-entity-operator
というClusterRole
にバインドされるようにします。strimzi-entity-operator
はこのアクセス権限をstrimzi-entity-operator
RoleBinding
経由で付与します。
-
Topic Operator はステータス情報のある OpenShift イベントを生成し、
-
KafkaConnect
およびKafkaConnectS2I
リソースの Pod はcluster-name-cluster-connect
というServiceAccount
を使用します。 -
KafkaMirrorMaker
の Pod はcluster-name-mirror-maker
というServiceAccount
を使用します。 -
KafkaMirrorMaker2
の Pod はcluster-name-mirrormaker2
というServiceAccount
を使用します。 -
KafkaBridge
の Pod はcluster-name-bridge
というServiceAccount
を使用します。
5.1.2.2. ServiceAccount
Cluster Operator は ServiceAccount
を使用して最適に実行されます。
Cluster Operator の ServiceAccount
の例
apiVersion: v1 kind: ServiceAccount metadata: name: strimzi-cluster-operator labels: app: strimzi
その後、Cluster Operator の Deployment
で、これを spec.template.spec.serviceAccountName
に指定する必要があります。
Cluster Operator の Deployment
の部分的な例
apiVersion: apps/v1 kind: Deployment metadata: name: strimzi-cluster-operator labels: app: strimzi spec: replicas: 1 selector: matchLabels: name: strimzi-cluster-operator strimzi.io/kind: cluster-operator template: # ...
strimzi-cluster-operator
ServiceAccount
が serviceAccountName
として指定されている 12 行目に注目してください。
5.1.2.3. ClusterRoles
Cluster Operator は、必要なリソースへのアクセス権限を付与する ClusterRoles
を使用して操作する必要があります。OpenShift クラスターの設定によっては、クラスター管理者が ClusterRoles
を作成する必要があることがあります。
クラスター管理者の権限は ClusterRoles
の作成にのみ必要です。Cluster Operator はクラスター管理者アカウントで実行されません。
ClusterRoles
は、 最小権限の原則に従い、Kafka、Kafka Connect、および ZooKeeper クラスターを操作するために Cluster Operator が必要とする権限のみが含まれます。最初に割り当てられた一連の権限により、Cluster Operator で StatefulSets
、Deployments
、Pods
、および ConfigMaps
などの OpenShift リソースを管理できます。
Cluster Operator は ClusterRoles を使用して、namespace スコープリソースのレベルおよびクラスタースコープリソースのレベルで権限を付与します。
Cluster Operator の namespaced リソースのある ClusterRole
apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: name: strimzi-cluster-operator-namespaced labels: app: strimzi rules: - apiGroups: - "rbac.authorization.k8s.io" resources: # The cluster operator needs to access and manage rolebindings to grant Strimzi components cluster permissions - rolebindings verbs: - get - list - watch - create - delete - patch - update - apiGroups: - "rbac.authorization.k8s.io" resources: # The cluster operator needs to access and manage roles to grant the entity operator permissions - roles verbs: - get - list - watch - create - delete - patch - update - apiGroups: - "" resources: # The cluster operator needs to access and delete pods, this is to allow it to monitor pod health and coordinate rolling updates - pods # The cluster operator needs to access and manage service accounts to grant Strimzi components cluster permissions - serviceaccounts # The cluster operator needs to access and manage config maps for Strimzi components configuration - configmaps # The cluster operator needs to access and manage services and endpoints to expose Strimzi components to network traffic - services - endpoints # The cluster operator needs to access and manage secrets to handle credentials - secrets # The cluster operator needs to access and manage persistent volume claims to bind them to Strimzi components for persistent data - persistentvolumeclaims verbs: - get - list - watch - create - delete - patch - update - apiGroups: - "kafka.strimzi.io" resources: # The cluster operator runs the KafkaAssemblyOperator, which needs to access and manage Kafka resources - kafkas - kafkas/status # The cluster operator runs the KafkaConnectAssemblyOperator, which needs to access and manage KafkaConnect resources - kafkaconnects - kafkaconnects/status # The cluster operator runs the KafkaConnectS2IAssemblyOperator, which needs to access and manage KafkaConnectS2I resources - kafkaconnects2is - kafkaconnects2is/status # The cluster operator runs the KafkaConnectorAssemblyOperator, which needs to access and manage KafkaConnector resources - kafkaconnectors - kafkaconnectors/status # The cluster operator runs the KafkaMirrorMakerAssemblyOperator, which needs to access and manage KafkaMirrorMaker resources - kafkamirrormakers - kafkamirrormakers/status # The cluster operator runs the KafkaBridgeAssemblyOperator, which needs to access and manage BridgeMaker resources - kafkabridges - kafkabridges/status # The cluster operator runs the KafkaMirrorMaker2AssemblyOperator, which needs to access and manage KafkaMirrorMaker2 resources - kafkamirrormaker2s - kafkamirrormaker2s/status # The cluster operator runs the KafkaRebalanceAssemblyOperator, which needs to access and manage KafkaRebalance resources - kafkarebalances - kafkarebalances/status verbs: - get - list - watch - create - delete - patch - update - apiGroups: # The cluster operator needs the extensions api as the operator supports Kubernetes version 1.11+ # apps/v1 was introduced in Kubernetes 1.14 - "extensions" resources: # The cluster operator needs to access and manage deployments to run deployment based Strimzi components - deployments - deployments/scale # The cluster operator needs to access replica sets to manage Strimzi components and to determine error states - replicasets # The cluster operator needs to access and manage replication controllers to manage replicasets - replicationcontrollers # The cluster operator needs to access and manage network policies to lock down communication between Strimzi components - networkpolicies # The cluster operator needs to access and manage ingresses which allow external access to the services in a cluster - ingresses verbs: - get - list - watch - create - delete - patch - update - apiGroups: - "apps" resources: # The cluster operator needs to access and manage deployments to run deployment based Strimzi components - deployments - deployments/scale - deployments/status # The cluster operator needs to access and manage stateful sets to run stateful sets based Strimzi components - statefulsets # The cluster operator needs to access replica-sets to manage Strimzi components and to determine error states - replicasets verbs: - get - list - watch - create - delete - patch - update - apiGroups: - "" resources: # The cluster operator needs to be able to create events and delegate permissions to do so - events verbs: - create - apiGroups: # OpenShift S2I requirements - apps.openshift.io resources: - deploymentconfigs - deploymentconfigs/scale - deploymentconfigs/status - deploymentconfigs/finalizers verbs: - get - list - watch - create - delete - patch - update - apiGroups: # OpenShift S2I requirements - build.openshift.io resources: - buildconfigs - buildconfigs/instantiate - builds verbs: - get - list - watch - create - delete - patch - update - apiGroups: # OpenShift S2I requirements - image.openshift.io resources: - imagestreams - imagestreams/status verbs: - get - list - watch - create - delete - patch - update - apiGroups: - networking.k8s.io resources: # The cluster operator needs to access and manage network policies to lock down communication between Strimzi components - networkpolicies # The cluster operator needs to access and manage ingresses which allow external access to the services in a cluster - ingresses verbs: - get - list - watch - create - delete - patch - update - apiGroups: - route.openshift.io resources: # The cluster operator needs to access and manage routes to expose Strimzi components for external access - routes - routes/custom-host verbs: - get - list - watch - create - delete - patch - update - apiGroups: - policy resources: # The cluster operator needs to access and manage pod disruption budgets this limits the number of concurrent disruptions # that a Strimzi component experiences, allowing for higher availability - poddisruptionbudgets verbs: - get - list - watch - create - delete - patch - update
2 番目の一連の権限には、クラスタースコープリソースに必要な権限が含まれます。
Cluster Operator のクラスタースコープリソースのある ClusterRole
apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: name: strimzi-cluster-operator-global labels: app: strimzi rules: - apiGroups: - "rbac.authorization.k8s.io" resources: # The cluster operator needs to create and manage cluster role bindings in the case of an install where a user # has specified they want their cluster role bindings generated - clusterrolebindings verbs: - get - list - watch - create - delete - patch - update - apiGroups: - storage.k8s.io resources: # The cluster operator requires "get" permissions to view storage class details # This is because only a persistent volume of a supported storage class type can be resized - storageclasses verbs: - get - apiGroups: - "" resources: # The cluster operator requires "list" permissions to view all nodes in a cluster # The listing is used to determine the node addresses when NodePort access is configured # These addresses are then exposed in the custom resource states - nodes verbs: - list
strimzi-kafka-broker
ClusterRole
は、ラック機能に使用される Kafka Pod の init コンテナーが必要とするアクセス権限を表します。「委譲された権限」 で説明したように、このアクセスを委譲できるようにするには、このロールも Cluster Operator に必要です。
Cluster Operator の ClusterRole
により、OpenShift ノードへのアクセスを Kafka ブローカー Pod に委譲できます。
apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: name: strimzi-kafka-broker labels: app: strimzi rules: - apiGroups: - "" resources: # The Kafka Brokers require "get" permissions to view the node they are on # This information is used to generate a Rack ID that is used for High Availability configurations - nodes verbs: - get
strimzi-topic-operator
の ClusterRole
は、Topic Operator が必要とするアクセスを表します。「委譲された権限」 で説明したように、このアクセスを委譲できるようにするには、このロールも Cluster Operator に必要です。
Cluster Operator の ClusterRole
により、イベントへのアクセスを Topic Operator に委譲できます。
apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: name: strimzi-entity-operator labels: app: strimzi rules: - apiGroups: - "kafka.strimzi.io" resources: # The entity operator runs the KafkaTopic assembly operator, which needs to access and manage KafkaTopic resources - kafkatopics - kafkatopics/status # The entity operator runs the KafkaUser assembly operator, which needs to access and manage KafkaUser resources - kafkausers - kafkausers/status verbs: - get - list - watch - create - patch - update - delete - apiGroups: - "" resources: - events verbs: # The entity operator needs to be able to create events - create - apiGroups: - "" resources: # The entity operator user-operator needs to access and manage secrets to store generated credentials - secrets verbs: - get - list - watch - create - delete - patch - update
strimzi-kafka-client
ClusterRole
は、クライアントのラックアウェアネスを使用する Kafka クライアントをベースとするコンポーネントが必要なアクセスを表します。「委譲された権限」 で説明したように、このアクセスを委譲できるようにするには、このロールも Cluster Operator に必要です。
Cluster Operator の ClusterRole
により、OpenShift ノードへのアクセスを Kafka クライアントベースの Pod に委譲できます。
apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: name: strimzi-kafka-client labels: app: strimzi rules: - apiGroups: - "" resources: # The Kafka clients (Connect, Mirror Maker, etc.) require "get" permissions to view the node they are on # This information is used to generate a Rack ID (client.rack option) that is used for consuming from the closest # replicas when enabled - nodes verbs: - get
5.1.2.4. ClusterRoleBindings
Operator には ClusterRoleBindings
と、ClusterRole
をServiceAccount
に関連付ける RoleBindings
が必要です。ClusterRoleBindings
は、クラスタースコープリロースが含まれる ClusterRoles
に必要です。
Cluster Operator の ClusterRoleBinding
の例
apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding metadata: name: strimzi-cluster-operator labels: app: strimzi subjects: - kind: ServiceAccount name: strimzi-cluster-operator namespace: myproject roleRef: kind: ClusterRole name: strimzi-cluster-operator-global apiGroup: rbac.authorization.k8s.io
ClusterRoleBindings
は、委譲に必要な ClusterRoles
にも必要です。
Kafka ブローカーラックアウェアネスの Cluster Operator の ClusterRoleBinding
例
apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding metadata: name: strimzi-cluster-operator-kafka-broker-delegation labels: app: strimzi # The Kafka broker cluster role must be bound to the cluster operator service account so that it can delegate the cluster role to the Kafka brokers. # This must be done to avoid escalating privileges which would be blocked by Kubernetes. subjects: - kind: ServiceAccount name: strimzi-cluster-operator namespace: myproject roleRef: kind: ClusterRole name: strimzi-kafka-broker apiGroup: rbac.authorization.k8s.io
以下も必要です。
Kafka クライアントラックアウェアネスの Cluster Operator の ClusterRoleBinding
例
apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding metadata: name: strimzi-cluster-operator-kafka-client-delegation labels: app: strimzi # The Kafka clients cluster role must be bound to the cluster operator service account so that it can delegate the # cluster role to the Kafka clients using it for consuming from closest replica. # This must be done to avoid escalating privileges which would be blocked by Kubernetes. subjects: - kind: ServiceAccount name: strimzi-cluster-operator namespace: myproject roleRef: kind: ClusterRole name: strimzi-kafka-client apiGroup: rbac.authorization.k8s.io
namespaced リソースのみが含まれる ClusterRoles
は、RoleBindings
のみを使用してバインドされます。
apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: name: strimzi-cluster-operator labels: app: strimzi subjects: - kind: ServiceAccount name: strimzi-cluster-operator namespace: myproject roleRef: kind: ClusterRole name: strimzi-cluster-operator-namespaced apiGroup: rbac.authorization.k8s.io
apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: name: strimzi-cluster-operator-entity-operator-delegation labels: app: strimzi # The Entity Operator cluster role must be bound to the cluster operator service account so that it can delegate the cluster role to the Entity Operator. # This must be done to avoid escalating privileges which would be blocked by Kubernetes. subjects: - kind: ServiceAccount name: strimzi-cluster-operator namespace: myproject roleRef: kind: ClusterRole name: strimzi-entity-operator apiGroup: rbac.authorization.k8s.io
5.2. Topic Operator の使用
KafkaTopic
リソースを使用してトピックを作成、編集、または削除する場合、Topic Operator によって変更が確実に Kafka クラスターで反映されます。
『OpenShift での AMQ Streams のデプロイおよびアップグレード』には、Topic Operator をデプロイする手順が記載されています。
5.2.1. Kafka トピックリソース
KafkaTopic
リソースは、パーティションやレプリカの数を含む、トピックの設定に使用されます。
KafkaTopic
の完全なスキーマは、「KafkaTopic
スキーマ参照」で確認できます。
5.2.1.1. トピック処理用の Kafka クラスターの特定
KafkaTopic
リソースには、このリソースが属する Kafka クラスターに適した名前 (Kafka
リソースの名前から派生) を定義するラベルが含まれています。
以下に例を示します。
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: topic-name-1 labels: strimzi.io/cluster: my-cluster
ラベルは、KafkaTopic
リソースを特定し、新しいトピックを作成するために、Topic Operator によって使用されます。また、以降のトピックの処理でも使用されます。
ラベルが Kafka クラスターと一致しない場合、Topic Operator は KafkaTopic
を識別できず、トピックは作成されません。
5.2.1.2. Kafka トピックの使用に関する推奨事項
トピックを使用する場合は、整合性を保ちます。常に KafkaTopic
リソースで作業を行うか、直接 OpenShift でトピックを扱います。特定のトピックで、両方の方法を頻繁に切り替えないでください。
トピックの性質を反映するトピック名を使用し、後で名前を変更できないことに注意してください。
Kafka でトピックを作成する場合は、有効な OpenShift リソース名である名前を使用します。それ以外の場合は、Topic Operator は対応する KafkaTopic
を OpenShift ルールに準じた名前で作成する必要があります。
OpenShift の識別子および名前の推奨事項については、OpenShift コミュニティーの記事「Identifiers and Names」を参照してください。
5.2.1.3. Kafka トピックの命名規則
Kafka と OpenShift では、Kafka と KafkaTopic.metadata.name
でのトピックの命名にそれぞれ独自の検証ルールを適用します。トピックごとに有効な名前があり、他のトピックには無効です。
spec.topicName
プロパティーを使用すると、OpenShift の Kafka トピックでは無効な名前を使用して、Kafka で有効なトピックを作成できます。
spec.topicName
プロパティーは Kafka の命名検証ルールを継承します。
- 249 文字を超える名前は使用できません。
-
Kafka トピックの有効な文字は ASCII 英数字、
.
、_
、および-
です。 -
名前を
.
または..
にすることはできませんが、.
はexampleTopic.
や.exampleTopic
のように名前で使用できます。
spec.topicName
は変更しないでください。
以下に例を示します。
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: topic-name-1
spec:
topicName: topicName-1 1
# ...
- 1
- OpenShift では大文字は無効です。
上記は下記のように変更できません。
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: topic-name-1 spec: topicName: name-2 # ...
Kafka Streams など一部の Kafka クライアントアプリケーションは、プログラムを使用して Kafka でトピックを作成できます。これらのトピックに、OpenShift リソース名としては無効な名前が付いている場合、Topic Operator はそれらのトピックに Kafka 名に基づく有効な metadata.name
を付けます。無効な文字が置き換えられ、ハッシュが名前に追加されます。以下に例を示します。
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: mytopic---c55e57fe2546a33f9e603caf57165db4072e827e spec: topicName: myTopic # ...
5.2.2. Topic Operator のトピックストア
Topic Operator は Kafka を使用して、トピック設定をキーと値のペアとして記述するトピックメタデータを保存します。トピックストアは、Kafka トピックを使用して状態を永続化する Kafka Streams のキーバリューメカニズムを基にしています。
トピックメタデータはインメモリーでキャッシュされ、Topic Operator 内にてローカルでアクセスされます。ローカルのインメモリーキャッシュに適用される操作からの更新は、ディスク上のバックアップトピックストアに永続化されます。トピックストアは、Kafka トピックまたは OpenShift KafkaTopic
カスタムリソースからの更新と継続的に同期されます。操作は、このような方法で設定されたトピックストアで迅速に処理されますが、インメモリーキャッシュがクラッシュした場合は、永続ストレージから自動的にデータが再入力されます。
5.2.2.1. 内部トピックストアトピック
内部トピックは、トピックストアでのトピックメタデータの処理をサポートします。
__strimzi_store_topic
- トピックメタデータを保存するための入力トピック
__strimzi-topic-operator-kstreams-topic-store-changelog
- 圧縮されたトピックストア値のログの維持
これらのトピックは、Topic Operator の実行に不可欠であるため、削除しないでください。
5.2.2.2. ZooKeeper からのトピックメタデータの移行
これまでのリリースの AMQ Streams では、トピックメタデータは ZooKeeper に保存されていました。新しいプロセスによってこの要件は除外されたため、メタデータは Kafka クラスターに取り込まれ、Topic Operator の制御下となります。
AMQ Streams 1.7 にアップグレードする場合、Topic Operator によってトピックストアが制御されるようにシームレスに移行されます。メタデータは ZooKeeper から検出および移行され、古いストアは削除されます。
5.2.2.3. ZooKeeper を使用してトピックメタデータを保存する AMQ Streams バージョンへのダウングレード
トピックメタデータの保存に ZooKeeper を使用する 0.22 より前のバージョンの AMQ Streams に戻す場合でも、Cluster Operator を前のバージョンにダウングレードしてから、Kafka ブローカーおよびクライアントアプリケーションを前の Kafka バージョンにダウングレードします。
ただし、kafka-admin
コマンドを使用して Kafka クラスターのブートストラップアドレスを指定し、トピックストア用に作成されたトピックを削除する必要もあります。以下に例を示します。
oc run kafka-admin -ti --image=registry.redhat.io/amq7/amq-streams-kafka-27-rhel7:1.7.0 --rm=true --restart=Never -- ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic __strimzi-topic-operator-kstreams-topic-store-changelog --delete && ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic __strimzi_store_topic --delete
このコマンドは、Kafka クラスターへのアクセスに使用されるリスナーおよび認証のタイプに対応している必要があります。
Topic Operator は、Kafka のトピックの状態から ZooKeeper トピックメタデータを再構築します。
5.2.2.4. Topic Operator トピックのレプリケーションおよびスケーリング
Topic Operator によって管理されるトピックには、トピックレプリケーション係数を 3 に設定し、最低でも 2 つの In-Sync レプリカを設定することが推奨されます。
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: my-topic labels: strimzi.io/cluster: my-cluster spec: partitions: 1 1 replicas: 3 2 config: min.insync.replicas=2 3 #...
In-Sync レプリカは、プロデューサーアプリケーションの acks
設定とともに使用されます。acks
設定は、メッセージの正常受信が確認される前に、メッセージがレプリケートされなければならないフォロワーパーティションの数を決定します。Topic Operator は acks=all
で実行されます。すべての In-Sync レプリカによってメッセージが確認される必要があります。
ブローカーを追加または削除して Kafka クラスターをスケーリングする場合、レプリケーション係数設定は変更されず、レプリカは自動的に再割り当てされません。ただし、kafka-reassign-partitions.sh
ツールを使用してレプリケーション係数を変更し、レプリカをブローカーに手動で再割り当てすることができます。
また、AMQ Streams の Cruise Control の統合ではトピックのレプリケーション係数を変更することはできませんが、Kafka をリバランスするために生成された最適化プロポーザルには、パーティションレプリカを転送し、パーティションリーダーを変更するコマンドが含まれます。
5.2.2.5. トピック変更の処理
Topic Operator にとって解決しなければならない基本的な問題として、信頼できる唯一の情報源 (SSOT: single source of truth) がないことがあります。KafkaTopic
リソースと Kafka トピックの両方とも、Topic Operator に関係なく変更される可能性があります面倒なことに、Topic Operator は KafkaTopic リソースと Kafka トピックで変更を常にリアルタイムで監視できるとは限りません。たとえば、Topic Operator が停止した場合などがこれに該当します。
これを解決するために、Topic Operator はトピックストアの各トピックに関する情報を維持します。Kafka クラスターまたは OpenShift で変更が生じると、他のシステムの状態とトピックストアの両方を確認し、すべての同期が保たれるように何を変更する必要があるかを判断します。同じことが Topic Operator の起動時に必ず実行され、また Topic Operator の稼働中にも定期的に行われます。
たとえば、Topic Operator が実行されていないときに my-topic という KafkaTopic
が作成された場合を考えてみましょう。Topic Operator の起動時、トピックストアには my-topic に関する情報が含まれないため、最後に実行された後に KafkaTopic
が作成されたと推測できます。Topic Operator によって my-topic に対応するトピックが作成され、さらにトピックストアに my-topic のメタデータも格納されます。
Kafka トピック設定を更新するか、KafkaTopic
カスタムリソースで変更を適用する場合、Kafka クラスターの調整後にトピックストアが更新されます。
また、トピックストアは Kafka トピックでトピック設定が変更され、かつ OpenShift KafkaTopic
カスタムリソースで更新される場合に、変更が矛盾しない限り、Topic Operator による管理を可能にします。たとえば、同じトピック設定キーに変更を加えることはできますが、別の値への変更のみが可能です。変更に矛盾がある場合、Kafka の設定が優先され、KafkaTopic
はそれに応じて更新されます。
KafkaTopic
リソースを使用すると、oc delete -f KAFKA-TOPIC-CONFIG-FILE
コマンドを使用してトピックを削除することもできます。これを可能にするには、Kafka リソースの spec.kafka.config
で delete.topic.enable
を true
(デフォルト) に設定する必要があります。
5.2.3. Kafka トピックの設定
KafkaTopic
リソースのプロパティーを使用して Kafka トピックを設定します。
oc apply
を使用すると、トピックを作成または編集できます。oc delete
を使用すると、既存のトピックを削除できます。
以下に例を示します。
-
oc apply -f <topic-config-file>
-
oc delete KafkaTopic <topic-name>
この手順では、10 個のパーティションと 2 つのレプリカがあるトピックを作成する方法を説明します。
作業を開始する前の注意事項
以下を考慮してから変更を行うことが重要になります。
Kafka は
KafkaTopic
リソースによる以下の変更をサポートしません。-
spec.topicName
を使用したトピック名の変更 -
spec.partitions
を使用したパーティションサイズの減少
-
-
spec.replicas
を使用して最初に指定したレプリカの数を変更することはできません。 -
キーのあるトピックの
spec.partitions
を増やすと、レコードをパーティション化する方法が変更されます。これは、トピックがセマンティックパーティションを使用するとき、特に問題になる場合があります。
前提条件
- TLS 認証および暗号化を使用してリスナーで設定された 稼働中の Kafka クラスターが必要です。
- 稼働中の Topic Operator が必要です (通常は Entity Operator でデプロイされます)。
-
トピックを削除する場合は、
Kafka
リソースのspec.kafka.config
がdelete.topic.enable=true
(デフォルト) である必要があります。
手順
作成する
KafkaTopic
が含まれるファイルを準備します。KafkaTopic
の例apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: orders labels: strimzi.io/cluster: my-cluster spec: partitions: 10 replicas: 2
ヒントトピックを変更する場合、現行バージョンのリソースは、
oc get kafkatopic orders -o yaml
を使用して取得できます。OpenShift で
KafkaTopic
リソースを作成します。oc apply -f TOPIC-CONFIG-FILE
5.2.4. リソース要求および制限のある Topic Operator の設定
CPU やメモリーなどのリソースを Topic Operator に割り当て、Topic Operator が消費できるリソースの量に制限を設定できます。
前提条件
- Cluster Operator が稼働している必要があります。
手順
必要に応じてエディターで Kafka クラスター設定を更新します。
oc edit kafka MY-CLUSTER
Kafka
リソースのspec.entityOperator.topicOperator.resources
プロパティーで、Topic Operator のリソース要求および制限を設定します。apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka spec: # Kafka and ZooKeeper sections... entityOperator: topicOperator: resources: requests: cpu: "1" memory: 500Mi limits: cpu: "1" memory: 500Mi
新しい設定を適用してリソースを作成または更新します。
oc apply -f KAFKA-CONFIG-FILE
5.3. User Operator の使用
KafkaUser
リソースを使用してユーザーを作成、編集、または削除する場合、User Operator によって変更が確実に Kafka クラスターで反映されます。
『OpenShift での AMQ Streams のデプロイおよびアップグレード』には、User Operator をデプロイする手順が記載されています。
スキーマの詳細は、「KafkaUser
スキーマ参照」を参照してください。
Kafka へのアクセスの認証および承認
KafkaUser
を使用して、Kafka にアクセスするために特定のクライアントが使用する認証および承認メカニズムを有効にします。
KafkUser
を使用してユーザーを管理し、Kafka ブローカーへのアクセスをセキュアにする方法の詳細は、「Kafka ブローカーへのアクセスのセキュア化」を参照してください。
5.3.1. リソース要求および制限のある User Operator の設定
CPU やメモリーなどのリソースを User Operator に割り当て、User Operator が消費できるリソースの量に制限を設定できます。
前提条件
- Cluster Operator が稼働している必要があります。
手順
必要に応じてエディターで Kafka クラスター設定を更新します。
oc edit kafka MY-CLUSTER
Kafka
リソースのspec.entityOperator.userOperator.resources
プロパティーで、User Operator のリソース要求および制限を設定します。apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka spec: # Kafka and ZooKeeper sections... entityOperator: userOperator: resources: requests: cpu: "1" memory: 500Mi limits: cpu: "1" memory: 500Mi
ファイルを保存し、エディターを終了します。Cluster Operator によって変更が自動的に適用されます。
5.4. Prometheus メトリクスを使用した Operator の監視
AMQ Streams の operator は Prometheus メトリクスを公開します。メトリクスは自動で有効になり、以下の情報が含まれます。
- 調整の数
- operator が処理しているカスタムリソースの数
- 調整の期間
- operator からの JVM メトリクス
この他に、Grafana ダッシュボードのサンプルが提供されます。
詳細は、『OpenShift での AMQ Streams のデプロイおよびアップグレード』の「AMQ Streams のメトリクスおよびダッシュボードの設定」を参照してください。