Chapter 13. Using AMQ Streams Operators
Use the AMQ Streams operators to manage your Kafka cluster, and Kafka topics and users.
13.1. Watching namespaces with AMQ Streams operators
Operators watch and manage AMQ Streams resources in namespaces. The Cluster Operator can watch a single namespace, multiple namespaces, or all namespaces in an OpenShift cluster. The Topic Operator and User Operator can watch a single namespace.
-
The Cluster Operator watches for
Kafka
resources -
The Topic Operator watches for
KafkaTopic
resources -
The User Operator watches for
KafkaUser
resources
The Topic Operator and the User Operator can only watch a single Kafka cluster in a namespace. And they can only be connected to a single Kafka cluster.
If multiple Topic Operators watch the same namespace, name collisions and topic deletion can occur. This is because each Kafka cluster uses Kafka topics that have the same name (such as __consumer_offsets
). Make sure that only one Topic Operator watches a given namespace.
When using multiple User Operators with a single namespace, a user with a given username can exist in more than one Kafka cluster.
If you deploy the Topic Operator and User Operator using the Cluster Operator, they watch the Kafka cluster deployed by the Cluster Operator by default. You can also specify a namespace using watchedNamespace
in the operator configuration.
For a standalone deployment of each operator, you specify a namespace and connection to the Kafka cluster to watch in the configuration.
13.2. Using the Cluster Operator
Use the Cluster Operator to deploy a Kafka cluster and other Kafka components.
13.2.1. Role-Based Access Control (RBAC) resources
The Cluster Operator creates and manages RBAC resources for AMQ Streams components that need access to OpenShift resources.
For the Cluster Operator to function, it needs permission within the OpenShift cluster to interact with Kafka resources, such as Kafka
and KafkaConnect
, as well as managed resources like ConfigMap
, Pod
, Deployment
, StatefulSet
, and Service
.
Permission is specified through OpenShift role-based access control (RBAC) resources:
-
ServiceAccount
-
Role
andClusterRole
-
RoleBinding
andClusterRoleBinding
13.2.1.1. Delegating privileges to AMQ Streams components
The Cluster Operator runs under a service account called strimzi-cluster-operator
. It is assigned cluster roles that give it permission to create the RBAC resources for AMQ Streams components. Role bindings associate the cluster roles with the service account.
OpenShift prevents components operating under one ServiceAccount
from granting another ServiceAccount
privileges that the granting ServiceAccount
does not have. Because the Cluster Operator creates the RoleBinding
and ClusterRoleBinding
RBAC resources needed by the resources it manages, it requires a role that gives it the same privileges.
The following tables describe the RBAC resources created by the Cluster Operator.
Name | Used by |
---|---|
| Kafka broker pods |
| ZooKeeper pods |
| Kafka Connect pods |
| MirrorMaker pods |
| MirrorMaker 2 pods |
| Kafka Bridge pods |
| Entity Operator |
Name | Used by |
---|---|
| Cluster Operator |
| Cluster Operator |
| Cluster Operator |
| Cluster Operator, rack feature (when used) |
| Cluster Operator, Topic Operator, User Operator |
| Cluster Operator, Kafka clients for rack awareness |
Name | Used by |
---|---|
| Cluster Operator |
| Cluster Operator, Kafka brokers for rack awareness |
| Cluster Operator, Kafka clients for rack awareness |
Name | Used by |
---|---|
| Cluster Operator |
| Cluster Operator, Kafka brokers for rack awareness |
13.2.1.2. Running the Cluster Operator using a ServiceAccount
The Cluster Operator is best run using a ServiceAccount
:
Example ServiceAccount
for the Cluster Operator
apiVersion: v1 kind: ServiceAccount metadata: name: strimzi-cluster-operator labels: app: strimzi
The Deployment
of the operator then needs to specify this in its spec.template.spec.serviceAccountName
:
Partial example of Deployment
for the Cluster Operator
apiVersion: apps/v1 kind: Deployment metadata: name: strimzi-cluster-operator labels: app: strimzi spec: replicas: 1 selector: matchLabels: name: strimzi-cluster-operator strimzi.io/kind: cluster-operator template: # ...
Note line 12, where strimzi-cluster-operator
is specified as the serviceAccountName
.
13.2.1.3. ClusterRole
resources
The Cluster Operator uses ClusterRole
resources to provide the necessary access to resources. Depending on the OpenShift cluster setup, a cluster administrator might be needed to create the cluster roles.
Cluster administrator rights are only needed for the creation of ClusterRole
resources. The Cluster Operator will not run under a cluster admin account.
ClusterRole
resources follow the principle of least privilege and contain only those privileges needed by the Cluster Operator to operate the cluster of the Kafka component. The first set of assigned privileges allow the Cluster Operator to manage OpenShift resources such as StatefulSet
, Deployment
, Pod
, and ConfigMap
.
All cluster roles are required by the Cluster Operator in order to delegate privileges.
The Cluster Operator uses the strimzi-cluster-operator-namespaced
and strimzi-cluster-operator-global
cluster roles to grant permission at the namespace-scoped resources level and cluster-scoped resources level.
ClusterRole
with namespaced resources for the Cluster Operator
apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: name: strimzi-cluster-operator-namespaced labels: app: strimzi rules: # Resources in this role are used by the operator based on an operand being deployed in some namespace. When needed, you # can deploy the operator as a cluster-wide operator. But grant the rights listed in this role only on the namespaces # where the operands will be deployed. That way, you can limit the access the operator has to other namespaces where it # does not manage any clusters. - apiGroups: - "rbac.authorization.k8s.io" resources: # The cluster operator needs to access and manage rolebindings to grant Strimzi components cluster permissions - rolebindings verbs: - get - list - watch - create - delete - patch - update - apiGroups: - "rbac.authorization.k8s.io" resources: # The cluster operator needs to access and manage roles to grant the entity operator permissions - roles verbs: - get - list - watch - create - delete - patch - update - apiGroups: - "" resources: # The cluster operator needs to access and delete pods, this is to allow it to monitor pod health and coordinate rolling updates - pods # The cluster operator needs to access and manage service accounts to grant Strimzi components cluster permissions - serviceaccounts # The cluster operator needs to access and manage config maps for Strimzi components configuration - configmaps # The cluster operator needs to access and manage services and endpoints to expose Strimzi components to network traffic - services - endpoints # The cluster operator needs to access and manage secrets to handle credentials - secrets # The cluster operator needs to access and manage persistent volume claims to bind them to Strimzi components for persistent data - persistentvolumeclaims verbs: - get - list - watch - create - delete - patch - update - apiGroups: - "apps" resources: # The cluster operator needs to access and manage deployments to run deployment based Strimzi components - deployments - deployments/scale - deployments/status # The cluster operator needs to access and manage stateful sets to run stateful sets based Strimzi components - statefulsets # The cluster operator needs to access replica-sets to manage Strimzi components and to determine error states - replicasets verbs: - get - list - watch - create - delete - patch - update - apiGroups: - "" # legacy core events api, used by topic operator - "events.k8s.io" # new events api, used by cluster operator resources: # The cluster operator needs to be able to create events and delegate permissions to do so - events verbs: - create - apiGroups: # Kafka Connect Build on OpenShift requirement - build.openshift.io resources: - buildconfigs - buildconfigs/instantiate - builds verbs: - get - list - watch - create - delete - patch - update - apiGroups: - networking.k8s.io resources: # The cluster operator needs to access and manage network policies to lock down communication between Strimzi components - networkpolicies # The cluster operator needs to access and manage ingresses which allow external access to the services in a cluster - ingresses verbs: - get - list - watch - create - delete - patch - update - apiGroups: - route.openshift.io resources: # The cluster operator needs to access and manage routes to expose Strimzi components for external access - routes - routes/custom-host verbs: - get - list - watch - create - delete - patch - update - apiGroups: - image.openshift.io resources: # The cluster operator needs to verify the image stream when used for Kafka Connect image build - imagestreams verbs: - get - apiGroups: - policy resources: # The cluster operator needs to access and manage pod disruption budgets this limits the number of concurrent disruptions # that a Strimzi component experiences, allowing for higher availability - poddisruptionbudgets verbs: - get - list - watch - create - delete - patch - update
ClusterRole
with cluster-scoped resources for the Cluster Operator
apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: name: strimzi-cluster-operator-global labels: app: strimzi rules: - apiGroups: - "rbac.authorization.k8s.io" resources: # The cluster operator needs to create and manage cluster role bindings in the case of an install where a user # has specified they want their cluster role bindings generated - clusterrolebindings verbs: - get - list - watch - create - delete - patch - update - apiGroups: - storage.k8s.io resources: # The cluster operator requires "get" permissions to view storage class details # This is because only a persistent volume of a supported storage class type can be resized - storageclasses verbs: - get - apiGroups: - "" resources: # The cluster operator requires "list" permissions to view all nodes in a cluster # The listing is used to determine the node addresses when NodePort access is configured # These addresses are then exposed in the custom resource states - nodes verbs: - list
The strimzi-cluster-operator-leader-election
cluster role represents the permissions needed for the leader election.
ClusterRole
with leader election permissions
apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: name: strimzi-cluster-operator-leader-election labels: app: strimzi rules: - apiGroups: - coordination.k8s.io resources: # The cluster operator needs to access and manage leases for leader election # The "create" verb cannot be used with "resourceNames" - leases verbs: - create - apiGroups: - coordination.k8s.io resources: # The cluster operator needs to access and manage leases for leader election - leases resourceNames: # The default RBAC files give the operator only access to the Lease resource names strimzi-cluster-operator # If you want to use another resource name or resource namespace, you have to configure the RBAC resources accordingly - strimzi-cluster-operator verbs: - get - list - watch - delete - patch - update
The strimzi-kafka-broker
cluster role represents the access needed by the init container in Kafka pods that use rack awareness.
A role binding named strimzi-<cluster_name>-kafka-init
grants the <cluster_name>-kafka
service account access to nodes within a cluster using the strimzi-kafka-broker
role. If the rack feature is not used and the cluster is not exposed through nodeport
, no binding is created.
ClusterRole
for the Cluster Operator allowing it to delegate access to OpenShift nodes to the Kafka broker pods
apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: name: strimzi-kafka-broker labels: app: strimzi rules: - apiGroups: - "" resources: # The Kafka Brokers require "get" permissions to view the node they are on # This information is used to generate a Rack ID that is used for High Availability configurations - nodes verbs: - get
The strimzi-entity-operator
cluster role represents the access needed by the Topic Operator and User Operator.
The Topic Operator produces OpenShift events with status information, so the <cluster_name>-entity-operator
service account is bound to the strimzi-entity-operator
role, which grants this access via the strimzi-entity-operator
role binding.
ClusterRole
for the Cluster Operator allowing it to delegate access to events to the Topic and User Operators
apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: name: strimzi-entity-operator labels: app: strimzi rules: - apiGroups: - "kafka.strimzi.io" resources: # The entity operator runs the KafkaTopic assembly operator, which needs to access and manage KafkaTopic resources - kafkatopics - kafkatopics/status # The entity operator runs the KafkaUser assembly operator, which needs to access and manage KafkaUser resources - kafkausers - kafkausers/status verbs: - get - list - watch - create - patch - update - delete - apiGroups: - "" resources: - events verbs: # The entity operator needs to be able to create events - create - apiGroups: - "" resources: # The entity operator user-operator needs to access and manage secrets to store generated credentials - secrets verbs: - get - list - watch - create - delete - patch - update
The strimzi-kafka-client
cluster role represents the access needed by Kafka clients that use rack awareness.
ClusterRole
for the Cluster Operator allowing it to delegate access to OpenShift nodes to the Kafka client-based pods
apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: name: strimzi-kafka-client labels: app: strimzi rules: - apiGroups: - "" resources: # The Kafka clients (Connect, Mirror Maker, etc.) require "get" permissions to view the node they are on # This information is used to generate a Rack ID (client.rack option) that is used for consuming from the closest # replicas when enabled - nodes verbs: - get
13.2.1.4. ClusterRoleBinding
resources
The Cluster Operator uses ClusterRoleBinding
and RoleBinding
resources to associate its ClusterRole
with its ServiceAccount
: Cluster role bindings are required by cluster roles containing cluster-scoped resources.
Example ClusterRoleBinding
for the Cluster Operator
apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding metadata: name: strimzi-cluster-operator labels: app: strimzi subjects: - kind: ServiceAccount name: strimzi-cluster-operator namespace: myproject roleRef: kind: ClusterRole name: strimzi-cluster-operator-global apiGroup: rbac.authorization.k8s.io
Cluster role bindings are also needed for the cluster roles used in delegating privileges:
Example ClusterRoleBinding
for the Cluster Operator and Kafka broker rack awareness
apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding metadata: name: strimzi-cluster-operator-kafka-broker-delegation labels: app: strimzi # The Kafka broker cluster role must be bound to the cluster operator service account so that it can delegate the cluster role to the Kafka brokers. # This must be done to avoid escalating privileges which would be blocked by Kubernetes. subjects: - kind: ServiceAccount name: strimzi-cluster-operator namespace: myproject roleRef: kind: ClusterRole name: strimzi-kafka-broker apiGroup: rbac.authorization.k8s.io
Example ClusterRoleBinding
for the Cluster Operator and Kafka client rack awareness
apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding metadata: name: strimzi-cluster-operator-kafka-client-delegation labels: app: strimzi # The Kafka clients cluster role must be bound to the cluster operator service account so that it can delegate the # cluster role to the Kafka clients using it for consuming from closest replica. # This must be done to avoid escalating privileges which would be blocked by Kubernetes. subjects: - kind: ServiceAccount name: strimzi-cluster-operator namespace: myproject roleRef: kind: ClusterRole name: strimzi-kafka-client apiGroup: rbac.authorization.k8s.io
Cluster roles containing only namespaced resources are bound using role bindings only.
Example RoleBinding
for the Cluster Operator
apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: name: strimzi-cluster-operator labels: app: strimzi subjects: - kind: ServiceAccount name: strimzi-cluster-operator namespace: myproject roleRef: kind: ClusterRole name: strimzi-cluster-operator-namespaced apiGroup: rbac.authorization.k8s.io
Example RoleBinding
for the Cluster Operator and Kafka broker rack awareness
apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: name: strimzi-cluster-operator-entity-operator-delegation labels: app: strimzi # The Entity Operator cluster role must be bound to the cluster operator service account so that it can delegate the cluster role to the Entity Operator. # This must be done to avoid escalating privileges which would be blocked by Kubernetes. subjects: - kind: ServiceAccount name: strimzi-cluster-operator namespace: myproject roleRef: kind: ClusterRole name: strimzi-entity-operator apiGroup: rbac.authorization.k8s.io
13.2.2. ConfigMap for Cluster Operator logging
Cluster Operator logging is configured through a ConfigMap
named strimzi-cluster-operator
.
A ConfigMap
containing logging configuration is created when installing the Cluster Operator. This ConfigMap
is described in the file install/cluster-operator/050-ConfigMap-strimzi-cluster-operator.yaml
. You configure Cluster Operator logging by changing the data field log4j2.properties
in this ConfigMap
.
To update the logging configuration, you can edit the 050-ConfigMap-strimzi-cluster-operator.yaml
file and then run the following command:
oc create -f install/cluster-operator/050-ConfigMap-strimzi-cluster-operator.yaml
Alternatively, edit the ConfigMap
directly:
oc edit configmap strimzi-cluster-operator
To change the frequency of the reload interval, set a time in seconds in the monitorInterval
option in the created ConfigMap
.
If the ConfigMap
is missing when the Cluster Operator is deployed, the default logging values are used.
If the ConfigMap
is accidentally deleted after the Cluster Operator is deployed, the most recently loaded logging configuration is used. Create a new ConfigMap
to load a new logging configuration.
Do not remove the monitorInterval
option from the ConfigMap
.
13.2.3. Configuring the Cluster Operator with environment variables
You can configure the Cluster Operator using environment variables. The supported environment variables are listed here.
The environment variables are specified for the container image of the Cluster Operator in its Deployment
configuration file. (install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml
)
STRIMZI_NAMESPACE
A comma-separated list of namespaces that the operator operates in. When not set, set to empty string, or set to
*
, the Cluster Operator operates in all namespaces.The Cluster Operator deployment might use the downward API to set this automatically to the namespace the Cluster Operator is deployed in.
Example configuration for Cluster Operator namespaces
env: - name: STRIMZI_NAMESPACE valueFrom: fieldRef: fieldPath: metadata.namespace
STRIMZI_FULL_RECONCILIATION_INTERVAL_MS
- Optional, default is 120000 ms. The interval between periodic reconciliations, in milliseconds.
STRIMZI_OPERATION_TIMEOUT_MS
- Optional, default 300000 ms. The timeout for internal operations, in milliseconds. Increase this value when using AMQ Streams on clusters where regular OpenShift operations take longer than usual (because of slow downloading of Docker images, for example).
STRIMZI_ZOOKEEPER_ADMIN_SESSION_TIMEOUT_MS
-
Optional, default 10000 ms. The session timeout for the Cluster Operator’s ZooKeeper admin client, in milliseconds. Increase the value if ZooKeeper requests from the Cluster Operator are regularly failing due to timeout issues. There is a maximum allowed session time set on the ZooKeeper server side via the
maxSessionTimeout
config. By default, the maximum session timeout value is 20 times the defaulttickTime
(whose default is 2000) at 40000 ms. If you require a higher timeout, change themaxSessionTimeout
ZooKeeper server configuration value. STRIMZI_OPERATIONS_THREAD_POOL_SIZE
- Optional, default 10. The worker thread pool size, which is used for various asynchronous and blocking operations that are run by the Cluster Operator.
STRIMZI_OPERATOR_NAME
- Optional, defaults to the pod’s hostname. The operator name identifies the AMQ Streams instance when emitting OpenShift events.
STRIMZI_OPERATOR_NAMESPACE
The name of the namespace where the Cluster Operator is running. Do not configure this variable manually. Use the downward API.
env: - name: STRIMZI_OPERATOR_NAMESPACE valueFrom: fieldRef: fieldPath: metadata.namespace
STRIMZI_OPERATOR_NAMESPACE_LABELS
Optional. The labels of the namespace where the AMQ Streams Cluster Operator is running. Use namespace labels to configure the namespace selector in network policies. Network policies allow the AMQ Streams Cluster Operator access only to the operands from the namespace with these labels. When not set, the namespace selector in network policies is configured to allow access to the Cluster Operator from any namespace in the OpenShift cluster.
env: - name: STRIMZI_OPERATOR_NAMESPACE_LABELS value: label1=value1,label2=value2
STRIMZI_LABELS_EXCLUSION_PATTERN
Optional, default regex pattern is
^app.kubernetes.io/(?!part-of).*
. The regex exclusion pattern used to filter labels propagation from the main custom resource to its subresources. The labels exclusion filter is not applied to labels in template sections such asspec.kafka.template.pod.metadata.labels
.env: - name: STRIMZI_LABELS_EXCLUSION_PATTERN value: "^key1.*"
STRIMZI_CUSTOM_{COMPONENT_NAME}_LABELS
Optional. One or more custom labels to apply to all the pods created by the
{COMPONENT_NAME}
custom resource. The Cluster Operator labels the pods when the custom resource is created or is next reconciled.Labels can be applied to the following components:
-
KAFKA
-
KAFKA_CONNECT
-
KAFKA_CONNECT_BUILD
-
ZOOKEEPER
-
ENTITY_OPERATOR
-
KAFKA_MIRROR_MAKER2
-
KAFKA_MIRROR_MAKER
-
CRUISE_CONTROL
-
KAFKA_BRIDGE
-
KAFKA_EXPORTER
-
STRIMZI_CUSTOM_RESOURCE_SELECTOR
Optional. The label selector to filter the custom resources handled by the Cluster Operator. The operator will operate only on those custom resources that have the specified labels set. Resources without these labels will not be seen by the operator. The label selector applies to
Kafka
,KafkaConnect
,KafkaBridge
,KafkaMirrorMaker
, andKafkaMirrorMaker2
resources.KafkaRebalance
andKafkaConnector
resources are operated only when their corresponding Kafka and Kafka Connect clusters have the matching labels.env: - name: STRIMZI_CUSTOM_RESOURCE_SELECTOR value: label1=value1,label2=value2
STRIMZI_KAFKA_IMAGES
-
Required. The mapping from the Kafka version to the corresponding Docker image containing a Kafka broker for that version. The required syntax is whitespace or comma-separated
<version>=<image>
pairs. For example3.3.1=registry.redhat.io/amq-streams/kafka-33-rhel8:2.4.0, 3.4.0=registry.redhat.io/amq-streams/kafka-34-rhel8:2.4.0
. This is used when aKafka.spec.kafka.version
property is specified but not theKafka.spec.kafka.image
in theKafka
resource. STRIMZI_DEFAULT_KAFKA_INIT_IMAGE
-
Optional, default
registry.redhat.io/amq-streams/strimzi-rhel8-operator:2.4.0
. The image name to use as default for the init container if no image is specified as thekafka-init-image
in theKafka
resource. The init container is started before the broker for initial configuration work, such as rack support. STRIMZI_KAFKA_CONNECT_IMAGES
-
Required. The mapping from the Kafka version to the corresponding Docker image of Kafka Connect for that version. The required syntax is whitespace or comma-separated
<version>=<image>
pairs. For example3.3.1=registry.redhat.io/amq-streams/kafka-33-rhel8:2.4.0, 3.4.0=registry.redhat.io/amq-streams/kafka-34-rhel8:2.4.0
. This is used when aKafkaConnect.spec.version
property is specified but not theKafkaConnect.spec.image
. STRIMZI_KAFKA_MIRROR_MAKER_IMAGES
-
Required. The mapping from the Kafka version to the corresponding Docker image of MirrorMaker for that version. The required syntax is whitespace or comma-separated
<version>=<image>
pairs. For example3.3.1=registry.redhat.io/amq-streams/kafka-33-rhel8:2.4.0, 3.4.0=registry.redhat.io/amq-streams/kafka-34-rhel8:2.4.0
. This is used when aKafkaMirrorMaker.spec.version
property is specified but not theKafkaMirrorMaker.spec.image
. STRIMZI_DEFAULT_TOPIC_OPERATOR_IMAGE
-
Optional, default
registry.redhat.io/amq-streams/strimzi-rhel8-operator:2.4.0
. The image name to use as the default when deploying the Topic Operator if no image is specified as theKafka.spec.entityOperator.topicOperator.image
in theKafka
resource. STRIMZI_DEFAULT_USER_OPERATOR_IMAGE
-
Optional, default
registry.redhat.io/amq-streams/strimzi-rhel8-operator:2.4.0
. The image name to use as the default when deploying the User Operator if no image is specified as theKafka.spec.entityOperator.userOperator.image
in theKafka
resource. STRIMZI_DEFAULT_TLS_SIDECAR_ENTITY_OPERATOR_IMAGE
-
Optional, default
registry.redhat.io/amq-streams/kafka-34-rhel8:2.4.0
. The image name to use as the default when deploying the sidecar container for the Entity Operator if no image is specified as theKafka.spec.entityOperator.tlsSidecar.image
in theKafka
resource. The sidecar provides TLS support. STRIMZI_IMAGE_PULL_POLICY
-
Optional. The
ImagePullPolicy
that is applied to containers in all pods managed by the Cluster Operator. The valid values areAlways
,IfNotPresent
, andNever
. If not specified, the OpenShift defaults are used. Changing the policy will result in a rolling update of all your Kafka, Kafka Connect, and Kafka MirrorMaker clusters. STRIMZI_IMAGE_PULL_SECRETS
-
Optional. A comma-separated list of
Secret
names. The secrets referenced here contain the credentials to the container registries where the container images are pulled from. The secrets are specified in theimagePullSecrets
property for all pods created by the Cluster Operator. Changing this list results in a rolling update of all your Kafka, Kafka Connect, and Kafka MirrorMaker clusters. STRIMZI_KUBERNETES_VERSION
Optional. Overrides the OpenShift version information detected from the API server.
Example configuration for OpenShift version override
env: - name: STRIMZI_KUBERNETES_VERSION value: | major=1 minor=16 gitVersion=v1.16.2 gitCommit=c97fe5036ef3df2967d086711e6c0c405941e14b gitTreeState=clean buildDate=2019-10-15T19:09:08Z goVersion=go1.12.10 compiler=gc platform=linux/amd64
KUBERNETES_SERVICE_DNS_DOMAIN
Optional. Overrides the default OpenShift DNS domain name suffix.
By default, services assigned in the OpenShift cluster have a DNS domain name that uses the default suffix
cluster.local
.For example, for broker kafka-0:
<cluster-name>-kafka-0.<cluster-name>-kafka-brokers.<namespace>.svc.cluster.local
The DNS domain name is added to the Kafka broker certificates used for hostname verification.
If you are using a different DNS domain name suffix in your cluster, change the
KUBERNETES_SERVICE_DNS_DOMAIN
environment variable from the default to the one you are using in order to establish a connection with the Kafka brokers.STRIMZI_CONNECT_BUILD_TIMEOUT_MS
- Optional, default 300000 ms. The timeout for building new Kafka Connect images with additional connectors, in milliseconds. Consider increasing this value when using AMQ Streams to build container images containing many connectors or using a slow container registry.
STRIMZI_NETWORK_POLICY_GENERATION
Optional, default
true
. Network policy for resources. Network policies allow connections between Kafka components.Set this environment variable to
false
to disable network policy generation. You might do this, for example, if you want to use custom network policies. Custom network policies allow more control over maintaining the connections between components.STRIMZI_DNS_CACHE_TTL
-
Optional, default
30
. Number of seconds to cache successful name lookups in local DNS resolver. Any negative value means cache forever. Zero means do not cache, which can be useful for avoiding connection errors due to long caching policies being applied. STRIMZI_POD_SET_RECONCILIATION_ONLY
-
Optional, default
false
. When set totrue
, the Cluster Operator reconciles only theStrimziPodSet
resources and any changes to the other custom resources (Kafka
,KafkaConnect
, and so on) are ignored. This mode is useful for ensuring that your pods are recreated if needed, but no other changes happen to the clusters. STRIMZI_FEATURE_GATES
- Optional. Enables or disables the features and functionality controlled by feature gates.
STRIMZI_POD_SECURITY_PROVIDER_CLASS
-
Optional. Configuration for the pluggable
PodSecurityProvider
class, which can be used to provide the security context configuration for Pods and containers.
13.2.3.1. Leader election environment variables
Use leader election environment variables when running additional Cluster Operator replicas. You might run additional replicas to safeguard against disruption caused by major failure.
STRIMZI_LEADER_ELECTION_ENABLED
-
Optional, disabled (
false
) by default. Enables or disables leader election, which allows additional Cluster Operator replicas to run on standby.
Leader election is disabled by default. It is only enabled when applying this environment variable on installation.
STRIMZI_LEADER_ELECTION_LEASE_NAME
-
Required when leader election is enabled. The name of the OpenShift
Lease
resource that is used for the leader election. STRIMZI_LEADER_ELECTION_LEASE_NAMESPACE
Required when leader election is enabled. The namespace where the OpenShift
Lease
resource used for leader election is created. You can use the downward API to configure it to the namespace where the Cluster Operator is deployed.env: - name: STRIMZI_LEADER_ELECTION_LEASE_NAMESPACE valueFrom: fieldRef: fieldPath: metadata.namespace
STRIMZI_LEADER_ELECTION_IDENTITY
Required when leader election is enabled. Configures the identity of a given Cluster Operator instance used during the leader election. The identity must be unique for each operator instance. You can use the downward API to configure it to the name of the pod where the Cluster Operator is deployed.
env: - name: STRIMZI_LEADER_ELECTION_IDENTITY valueFrom: fieldRef: fieldPath: metadata.name
STRIMZI_LEADER_ELECTION_LEASE_DURATION_MS
- Optional, default 15000 ms. Specifies the duration the acquired lease is valid.
STRIMZI_LEADER_ELECTION_RENEW_DEADLINE_MS
- Optional, default 10000 ms. Specifies the period the leader should try to maintain leadership.
STRIMZI_LEADER_ELECTION_RETRY_PERIOD_MS
- Optional, default 2000 ms. Specifies the frequency of updates to the lease lock by the leader.
13.2.3.2. Restricting Cluster Operator access with network policy
Use the STRIMZI_OPERATOR_NAMESPACE_LABELS
environment variable to establish network policy for the Cluster Operator using namespace labels.
The Cluster Operator can run in the same namespace as the resources it manages, or in a separate namespace. By default, the STRIMZI_OPERATOR_NAMESPACE
environment variable is configured to use the downward API to find the namespace the Cluster Operator is running in. If the Cluster Operator is running in the same namespace as the resources, only local access is required and allowed by AMQ Streams.
If the Cluster Operator is running in a separate namespace to the resources it manages, any namespace in the OpenShift cluster is allowed access to the Cluster Operator unless network policy is configured. By adding namespace labels, access to the Cluster Operator is restricted to the namespaces specified.
Network policy configured for the Cluster Operator deployment
#... env: # ... - name: STRIMZI_OPERATOR_NAMESPACE_LABELS value: label1=value1,label2=value2 #...
13.2.3.3. Setting the time interval for periodic reconciliation
Use the STRIMZI_FULL_RECONCILIATION_INTERVAL_MS
variable to set the time interval for periodic reconciliations.
The Cluster Operator reacts to all notifications about applicable cluster resources received from the OpenShift cluster. If the operator is not running, or if a notification is not received for any reason, resources will get out of sync with the state of the running OpenShift cluster. In order to handle failovers properly, a periodic reconciliation process is executed by the Cluster Operator so that it can compare the state of the resources with the current cluster deployments in order to have a consistent state across all of them.
Additional resources
13.2.4. Configuring the Cluster Operator with default proxy settings
If you are running a Kafka cluster behind a HTTP proxy, you can still pass data in and out of the cluster. For example, you can run Kafka Connect with connectors that push and pull data from outside the proxy. Or you can use a proxy to connect with an authorization server.
Configure the Cluster Operator deployment to specify the proxy environment variables. The Cluster Operator accepts standard proxy configuration (HTTP_PROXY
, HTTPS_PROXY
and NO_PROXY
) as environment variables. The proxy settings are applied to all AMQ Streams containers.
The format for a proxy address is http://IP-ADDRESS:PORT-NUMBER. To set up a proxy with a name and password, the format is http://USERNAME:PASSWORD@IP-ADDRESS:PORT-NUMBER.
Prerequisites
-
You need an account with permission to create and manage
CustomResourceDefinition
and RBAC (ClusterRole
, andRoleBinding
) resources.
Procedure
To add proxy environment variables to the Cluster Operator, update its
Deployment
configuration (install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml
).Example proxy configuration for the Cluster Operator
apiVersion: apps/v1 kind: Deployment spec: # ... template: spec: serviceAccountName: strimzi-cluster-operator containers: # ... env: # ... - name: "HTTP_PROXY" value: "http://proxy.com" 1 - name: "HTTPS_PROXY" value: "https://proxy.com" 2 - name: "NO_PROXY" value: "internal.com, other.domain.com" 3 # ...
Alternatively, edit the
Deployment
directly:oc edit deployment strimzi-cluster-operator
If you updated the YAML file instead of editing the
Deployment
directly, apply the changes:oc create -f install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml
Additional resources
13.2.5. Running multiple Cluster Operator replicas with leader election
The default Cluster Operator configuration enables leader election. Use leader election to run multiple parallel replicas of the Cluster Operator. One replica is elected as the active leader and operates the deployed resources. The other replicas run in standby mode. When the leader stops or fails, one of the standby replicas is elected as the new leader and starts operating the deployed resources.
By default, AMQ Streams runs with a single Cluster Operator replica that is always the leader replica. When a single Cluster Operator replica stops or fails, OpenShift starts a new replica.
Running the Cluster Operator with multiple replicas is not essential. But it’s useful to have replicas on standby in case of large-scale disruptions. For example, suppose multiple worker nodes or an entire availability zone fails. This failure might cause the Cluster Operator pod and many Kafka pods to go down at the same time. If subsequent pod scheduling causes congestion through lack of resources, this can delay operations when running a single Cluster Operator.
13.2.5.1. Configuring Cluster Operator replicas
To run additional Cluster Operator replicas in standby mode, you will need to increase the number of replicas and enable leader election. To configure leader election, use the leader election environment variables.
To make the required changes, configure the following Cluster Operator installation files located in install/cluster-operator/
:
- 060-Deployment-strimzi-cluster-operator.yaml
- 022-ClusterRole-strimzi-cluster-operator-role.yaml
- 022-RoleBinding-strimzi-cluster-operator.yaml
Leader election has its own ClusterRole
and RoleBinding
RBAC resources that target the namespace where the Cluster Operator is running, rather than the namespace it is watching.
The default deployment configuration creates a Lease
resource called strimzi-cluster-operator
in the same namespace as the Cluster Operator. The Cluster Operator uses leases to manage leader election. The RBAC resources provide the permissions to use the Lease
resource. If you use a different Lease
name or namespace, update the ClusterRole
and RoleBinding
files accordingly.
Prerequisites
-
You need an account with permission to create and manage
CustomResourceDefinition
and RBAC (ClusterRole
, andRoleBinding
) resources.
Procedure
Edit the Deployment
resource that is used to deploy the Cluster Operator, which is defined in the 060-Deployment-strimzi-cluster-operator.yaml
file.
Change the
replicas
property from the default (1) to a value that matches the required number of replicas.Increasing the number of Cluster Operator replicas
apiVersion: apps/v1 kind: Deployment metadata: name: strimzi-cluster-operator labels: app: strimzi spec: replicas: 3
Check that the leader election
env
properties are set.If they are not set, configure them.
To enable leader election,
STRIMZI_LEADER_ELECTION_ENABLED
must be set totrue
(default).In this example, the name of the lease is changed to
my-strimzi-cluster-operator
.Configuring leader election environment variables for the Cluster Operator
# ... spec containers: - name: strimzi-cluster-operator # ... env: - name: STRIMZI_LEADER_ELECTION_ENABLED value: "true" - name: STRIMZI_LEADER_ELECTION_LEASE_NAME value: "my-strimzi-cluster-operator" - name: STRIMZI_LEADER_ELECTION_LEASE_NAMESPACE valueFrom: fieldRef: fieldPath: metadata.namespace - name: STRIMZI_LEADER_ELECTION_IDENTITY valueFrom: fieldRef: fieldPath: metadata.name
For a description of the available environment variables, see Section 13.2.3.1, “Leader election environment variables”.
If you specified a different name or namespace for the
Lease
resource used in leader election, update the RBAC resources.(optional) Edit the
ClusterRole
resource in the022-ClusterRole-strimzi-cluster-operator-role.yaml
file.Update
resourceNames
with the name of theLease
resource.Updating the ClusterRole references to the lease
apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: name: strimzi-cluster-operator-leader-election labels: app: strimzi rules: - apiGroups: - coordination.k8s.io resourceNames: - my-strimzi-cluster-operator # ...
(optional) Edit the
RoleBinding
resource in the022-RoleBinding-strimzi-cluster-operator.yaml
file.Update
subjects.name
andsubjects.namespace
with the name of theLease
resource and the namespace where it was created.Updating the RoleBinding references to the lease
apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: name: strimzi-cluster-operator-leader-election labels: app: strimzi subjects: - kind: ServiceAccount name: my-strimzi-cluster-operator namespace: myproject # ...
Deploy the Cluster Operator:
oc create -f install/cluster-operator -n myproject
Check the status of the deployment:
oc get deployments -n myproject
Output shows the deployment name and readiness
NAME READY UP-TO-DATE AVAILABLE strimzi-cluster-operator 3/3 3 3
READY
shows the number of replicas that are ready/expected. The deployment is successful when theAVAILABLE
output shows the correct number of replicas.
13.2.6. FIPS support
Federal Information Processing Standards (FIPS) are standards for computer security and interoperability. When running AMQ Streams on a FIPS-enabled OpenShift cluster, the OpenJDK used in AMQ Streams container images automatically switches to FIPS mode. From version 2.4, AMQ Streams can run on FIPS-enabled OpenShift clusters without any changes or special configuration. It uses only the FIPS-compliant security libraries from the OpenJDK.
Minimum password length
When running in the FIPS mode, SCRAM-SHA-512 passwords need to be at least 32 characters long. From AMQ Streams 2.4, the default password length in AMQ Streams User Operator is set to 32 characters as well. If you have a Kafka cluster with custom configuration that uses a password length that is less than 32 characters, you need to update your configuration. If you have any users with passwords shorter than 32 characters, you need to regenerate a password with the required length. You can do that, for example, by deleting the user secret and waiting for the User Operator to create a new password with the appropriate length.
If you are using FIPS-enabled OpenShift clusters, you may experience higher memory consumption compared to regular OpenShift clusters. To avoid any issues, we suggest increasing the memory request to at least 512Mi.
Additional resources
13.2.6.1. Disabling FIPS mode
AMQ Streams automatically switches to FIPS mode when running on a FIPS-enabled OpenShift cluster. Disable FIPS mode by setting the FIPS_MODE
environment variable to disabled
in the deployment configuration for the Cluster Operator. With FIPS mode disabled, AMQ Streams automatically disables FIPS in the OpenJDK for all components. With FIPS mode disabled, AMQ Streams is not FIPS compliant. The AMQ Streams operators, as well as all operands, run in the same way as if they were running on an OpenShift cluster without FIPS enabled.
Procedure
To disable the FIPS mode in the Cluster Operator, update its
Deployment
configuration (install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml
) and add theFIPS_MODE
environment variable.Example FIPS configuration for the Cluster Operator
apiVersion: apps/v1 kind: Deployment spec: # ... template: spec: serviceAccountName: strimzi-cluster-operator containers: # ... env: # ... - name: "FIPS_MODE" value: "disabled" 1 # ...
- 1
- Disables the FIPS mode.
Alternatively, edit the
Deployment
directly:oc edit deployment strimzi-cluster-operator
If you updated the YAML file instead of editing the
Deployment
directly, apply the changes:oc apply -f install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml
13.3. Using the Topic Operator
When you create, modify or delete a topic using the KafkaTopic
resource, the Topic Operator ensures those changes are reflected in the Kafka cluster.
For more information on the KafkaTopic
resource, see the KafkaTopic
schema reference.
Deploying the Topic Operator
You can deploy the Topic Operator using the Cluster Operator or as a standalone operator. You would use a standalone Topic Operator with a Kafka cluster that is not managed by the Cluster Operator.
For deployment instructions, see the following:
To deploy the standalone Topic Operator, you need to set environment variables to connect to a Kafka cluster. These environment variables do not need to be set if you are deploying the Topic Operator using the Cluster Operator as they will be set by the Cluster Operator.
13.3.1. Kafka topic resource
The KafkaTopic
resource is used to configure topics, including the number of partitions and replicas.
The full schema for KafkaTopic
is described in KafkaTopic
schema reference.
13.3.1.1. Identifying a Kafka cluster for topic handling
A KafkaTopic
resource includes a label that specifies the name of the Kafka cluster (derived from the name of the Kafka
resource) to which it belongs.
For example:
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: topic-name-1 labels: strimzi.io/cluster: my-cluster
The label is used by the Topic Operator to identify the KafkaTopic
resource and create a new topic, and also in subsequent handling of the topic.
If the label does not match the Kafka cluster, the Topic Operator cannot identify the KafkaTopic
and the topic is not created.
13.3.1.2. Kafka topic usage recommendations
When working with topics, be consistent. Always operate on either KafkaTopic
resources or topics directly in OpenShift. Avoid routinely switching between both methods for a given topic.
Use topic names that reflect the nature of the topic, and remember that names cannot be changed later.
If creating a topic in Kafka, use a name that is a valid OpenShift resource name, otherwise the Topic Operator will need to create the corresponding KafkaTopic
with a name that conforms to the OpenShift rules.
For information on the requirements for identifiers and names in OpenShift, refer to Object Names and IDs.
13.3.1.3. Kafka topic naming conventions
Kafka and OpenShift impose their own validation rules for the naming of topics in Kafka and KafkaTopic.metadata.name
respectively. There are valid names for each which are invalid in the other.
Using the spec.topicName
property, it is possible to create a valid topic in Kafka with a name that would be invalid for the Kafka topic in OpenShift.
The spec.topicName
property inherits Kafka naming validation rules:
- The name must not be longer than 249 characters.
-
Valid characters for Kafka topics are ASCII alphanumerics,
.
,_
, and-
. -
The name cannot be
.
or..
, though.
can be used in a name, such asexampleTopic.
or.exampleTopic
.
spec.topicName
must not be changed.
For example:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: topic-name-1
spec:
topicName: topicName-1 1
# ...
- 1
- Upper case is invalid in OpenShift.
cannot be changed to:
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: topic-name-1 spec: topicName: name-2 # ...
Some Kafka client applications, such as Kafka Streams, can create topics in Kafka programmatically. If those topics have names that are invalid OpenShift resource names, the Topic Operator gives them a valid metadata.name
based on the Kafka name. Invalid characters are replaced and a hash is appended to the name. For example:
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: mytopic---c55e57fe2546a33f9e603caf57165db4072e827e spec: topicName: myTopic # ...
13.3.2. Topic Operator topic store
The Topic Operator uses Kafka to store topic metadata describing topic configuration as key-value pairs. The topic store is based on the Kafka Streams key-value mechanism, which uses Kafka topics to persist the state.
Topic metadata is cached in-memory and accessed locally within the Topic Operator. Updates from operations applied to the local in-memory cache are persisted to a backup topic store on disk. The topic store is continually synchronized with updates from Kafka topics or OpenShift KafkaTopic
custom resources. Operations are handled rapidly with the topic store set up this way, but should the in-memory cache crash it is automatically repopulated from the persistent storage.
13.3.2.1. Internal topic store topics
Internal topics support the handling of topic metadata in the topic store.
__strimzi_store_topic
- Input topic for storing the topic metadata
__strimzi-topic-operator-kstreams-topic-store-changelog
- Retains a log of compacted topic store values
Do not delete these topics, as they are essential to the running of the Topic Operator.
13.3.2.2. Migrating topic metadata from ZooKeeper
In previous releases of AMQ Streams, topic metadata was stored in ZooKeeper. The new process removes this requirement, bringing the metadata into the Kafka cluster, and under the control of the Topic Operator.
When upgrading to AMQ Streams 2.4, the transition to Topic Operator control of the topic store is seamless. Metadata is found and migrated from ZooKeeper, and the old store is deleted.
13.3.2.3. Downgrading to an AMQ Streams version that uses ZooKeeper to store topic metadata
If you are reverting back to a version of AMQ Streams earlier than 1.7, which uses ZooKeeper for the storage of topic metadata, you still downgrade your Cluster Operator to the previous version, then downgrade Kafka brokers and client applications to the previous Kafka version as standard.
However, you must also delete the topics that were created for the topic store using a kafka-admin
command, specifying the bootstrap address of the Kafka cluster. For example:
oc run kafka-admin -ti --image=registry.redhat.io/amq-streams/kafka-34-rhel8:2.4.0 --rm=true --restart=Never -- ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic __strimzi-topic-operator-kstreams-topic-store-changelog --delete && ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic __strimzi_store_topic --delete
The command must correspond to the type of listener and authentication used to access the Kafka cluster.
The Topic Operator will reconstruct the ZooKeeper topic metadata from the state of the topics in Kafka.
13.3.2.4. Topic Operator topic replication and scaling
The recommended configuration for topics managed by the Topic Operator is a topic replication factor of 3, and a minimum of 2 in-sync replicas.
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: my-topic labels: strimzi.io/cluster: my-cluster spec: partitions: 10 1 replicas: 3 2 config: min.insync.replicas: 2 3 #...
- 1
- The number of partitions for the topic.
- 2
- The number of replica topic partitions. Currently, this cannot be changed in the
KafkaTopic
resource, but it can be changed using thekafka-reassign-partitions.sh
tool. - 3
- The minimum number of replica partitions that a message must be successfully written to, or an exception is raised.
In-sync replicas are used in conjunction with the acks
configuration for producer applications. The acks
configuration determines the number of follower partitions a message must be replicated to before the message is acknowledged as successfully received. The Topic Operator runs with acks=all
, whereby messages must be acknowledged by all in-sync replicas.
When scaling Kafka clusters by adding or removing brokers, replication factor configuration is not changed and replicas are not reassigned automatically. However, you can use the kafka-reassign-partitions.sh
tool to change the replication factor, and manually reassign replicas to brokers.
Alternatively, though the integration of Cruise Control for AMQ Streams cannot change the replication factor for topics, the optimization proposals it generates for rebalancing Kafka include commands that transfer partition replicas and change partition leadership.
13.3.2.5. Handling changes to topics
A fundamental problem that the Topic Operator needs to solve is that there is no single source of truth: both the KafkaTopic
resource and the Kafka topic can be modified independently of the Topic Operator. Complicating this, the Topic Operator might not always be able to observe changes at each end in real time. For example, when the Topic Operator is down.
To resolve this, the Topic Operator maintains information about each topic in the topic store. When a change happens in the Kafka cluster or OpenShift, it looks at both the state of the other system and the topic store in order to determine what needs to change to keep everything in sync. The same thing happens whenever the Topic Operator starts, and periodically while it is running.
For example, suppose the Topic Operator is not running, and a KafkaTopic
called my-topic is created. When the Topic Operator starts, the topic store does not contain information on my-topic, so it can infer that the KafkaTopic
was created after it was last running. The Topic Operator creates the topic corresponding to my-topic, and also stores metadata for my-topic in the topic store.
If you update Kafka topic configuration or apply a change through the KafkaTopic
custom resource, the topic store is updated after the Kafka cluster is reconciled.
The topic store also allows the Topic Operator to manage scenarios where the topic configuration is changed in Kafka topics and updated through OpenShift KafkaTopic
custom resources, as long as the changes are not incompatible. For example, it is possible to make changes to the same topic config key, but to different values. For incompatible changes, the Kafka configuration takes priority, and the KafkaTopic
is updated accordingly.
You can also use the KafkaTopic
resource to delete topics using a oc delete -f KAFKA-TOPIC-CONFIG-FILE
command. To be able to do this, delete.topic.enable
must be set to true
(default) in the spec.kafka.config
of the Kafka resource.
13.3.3. Configuring Kafka topics
Use the properties of the KafkaTopic
resource to configure Kafka topics.
You can use oc apply
to create or modify topics, and oc delete
to delete existing topics.
For example:
-
oc apply -f <topic_config_file>
-
oc delete KafkaTopic <topic_name>
This procedure shows how to create a topic with 10 partitions and 2 replicas.
Before you start
It is important that you consider the following before making your changes:
- Kafka does not support decreasing the number of partitions.
-
Increasing
spec.partitions
for topics with keys will change how records are partitioned, which can be particularly problematic when the topic uses semantic partitioning. AMQ Streams does not support making the following changes through the
KafkaTopic
resource:-
Using
spec.replicas
to change the number of replicas that were initially specified -
Changing topic names using
spec.topicName
-
Using
Prerequisites
- A running Kafka cluster configured with a Kafka broker listener using mTLS authentication and TLS encryption.
- A running Topic Operator (typically deployed with the Entity Operator).
-
For deleting a topic,
delete.topic.enable=true
(default) in thespec.kafka.config
of theKafka
resource.
Procedure
Configure the
KafkaTopic
resource.Example Kafka topic configuration
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: orders labels: strimzi.io/cluster: my-cluster spec: partitions: 10 replicas: 2
TipWhen modifying a topic, you can get the current version of the resource using
oc get kafkatopic orders -o yaml
.Create the
KafkaTopic
resource in OpenShift.oc apply -f <topic_config_file>
Wait for the ready status of the topic to change to
True
:oc get kafkatopics -o wide -w -n <namespace>
Kafka topic status
NAME CLUSTER PARTITIONS REPLICATION FACTOR READY my-topic-1 my-cluster 10 3 True my-topic-2 my-cluster 10 3 my-topic-3 my-cluster 10 3 True
Topic creation is successful when the
READY
output showsTrue
.If the
READY
column stays blank, get more details on the status from the resource YAML or from the Topic Operator logs.Messages provide details on the reason for the current status.
oc get kafkatopics my-topic-2 -o yaml
Details on a topic with a
NotReady
status# ... status: conditions: - lastTransitionTime: "2022-06-13T10:14:43.351550Z" message: Number of partitions cannot be decreased reason: PartitionDecreaseException status: "True" type: NotReady
In this example, the reason the topic is not ready is because the original number of partitions was reduced in the
KafkaTopic
configuration. Kafka does not support this.After resetting the topic configuration, the status shows the topic is ready.
oc get kafkatopics my-topic-2 -o wide -w -n <namespace>
Status update of the topic
NAME CLUSTER PARTITIONS REPLICATION FACTOR READY my-topic-2 my-cluster 10 3 True
Fetching the details shows no messages
oc get kafkatopics my-topic-2 -o yaml
Details on a topic with a
READY
status# ... status: conditions: - lastTransitionTime: '2022-06-13T10:15:03.761084Z' status: 'True' type: Ready
13.3.4. Configuring the Topic Operator with resource requests and limits
You can allocate resources, such as CPU and memory, to the Topic Operator and set a limit on the amount of resources it can consume.
Prerequisites
- The Cluster Operator is running.
Procedure
Update the Kafka cluster configuration in an editor, as required:
oc edit kafka MY-CLUSTER
In the
spec.entityOperator.topicOperator.resources
property in theKafka
resource, set the resource requests and limits for the Topic Operator.apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka spec: # Kafka and ZooKeeper sections... entityOperator: topicOperator: resources: requests: cpu: "1" memory: 500Mi limits: cpu: "1" memory: 500Mi
Apply the new configuration to create or update the resource.
oc apply -f <kafka_configuration_file>
13.4. Using the User Operator
When you create, modify or delete a user using the KafkaUser
resource, the User Operator ensures those changes are reflected in the Kafka cluster.
For more information on the KafkaUser
resource, see the KafkaUser
schema reference.
Deploying the User Operator
You can deploy the User Operator using the Cluster Operator or as a standalone operator. You would use a standalone User Operator with a Kafka cluster that is not managed by the Cluster Operator.
For deployment instructions, see the following:
To deploy the standalone User Operator, you need to set environment variables to connect to a Kafka cluster. These environment variables do not need to be set if you are deploying the User Operator using the Cluster Operator as they will be set by the Cluster Operator.
13.4.1. Configuring Kafka users
Use the properties of the KafkaUser
resource to configure Kafka users.
You can use oc apply
to create or modify users, and oc delete
to delete existing users.
For example:
-
oc apply -f <user_config_file>
-
oc delete KafkaUser <user_name>
Users represent Kafka clients. When you configure Kafka users, you enable the user authentication and authorization mechanisms required by clients to access Kafka. The mechanism used must match the equivalent Kafka
configuration. For more information on using Kafka
and KafkaUser
resources to secure access to Kafka brokers, see Securing access to Kafka brokers.
Prerequisites
- A running Kafka cluster configured with a Kafka broker listener using mTLS authentication and TLS encryption.
- A running User Operator (typically deployed with the Entity Operator).
Procedure
Configure the
KafkaUser
resource.This example specifies mTLS authentication and simple authorization using ACLs.
Example Kafka user configuration
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaUser metadata: name: my-user labels: strimzi.io/cluster: my-cluster spec: authentication: type: tls authorization: type: simple acls: # Example consumer Acls for topic my-topic using consumer group my-group - resource: type: topic name: my-topic patternType: literal operations: - Describe - Read host: "*" - resource: type: group name: my-group patternType: literal operations: - Read host: "*" # Example Producer Acls for topic my-topic - resource: type: topic name: my-topic patternType: literal operations: - Create - Describe - Write host: "*"
Create the
KafkaUser
resource in OpenShift.oc apply -f <user_config_file>
Wait for the ready status of the user to change to
True
:oc get kafkausers -o wide -w -n <namespace>
Kafka user status
NAME CLUSTER AUTHENTICATION AUTHORIZATION READY my-user-1 my-cluster tls simple True my-user-2 my-cluster tls simple my-user-3 my-cluster tls simple True
User creation is successful when the
READY
output showsTrue
.If the
READY
column stays blank, get more details on the status from the resource YAML or User Operator logs.Messages provide details on the reason for the current status.
oc get kafkausers my-user-2 -o yaml
Details on a user with a
NotReady
status# ... status: conditions: - lastTransitionTime: "2022-06-10T10:07:37.238065Z" message: Simple authorization ACL rules are configured but not supported in the Kafka cluster configuration. reason: InvalidResourceException status: "True" type: NotReady
In this example, the reason the user is not ready is because simple authorization is not enabled in the
Kafka
configuration.Kafka configuration for simple authorization
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster spec: kafka: # ... authorization: type: simple
After updating the Kafka configuration, the status shows the user is ready.
oc get kafkausers my-user-2 -o wide -w -n <namespace>
Status update of the user
NAME CLUSTER AUTHENTICATION AUTHORIZATION READY my-user-2 my-cluster tls simple True
Fetching the details shows no messages.
oc get kafkausers my-user-2 -o yaml
Details on a user with a
READY
status# ... status: conditions: - lastTransitionTime: "2022-06-10T10:33:40.166846Z" status: "True" type: Ready
13.4.2. Configuring the User Operator with resource requests and limits
You can allocate resources, such as CPU and memory, to the User Operator and set a limit on the amount of resources it can consume.
Prerequisites
- The Cluster Operator is running.
Procedure
Update the Kafka cluster configuration in an editor, as required:
oc edit kafka MY-CLUSTER
In the
spec.entityOperator.userOperator.resources
property in theKafka
resource, set the resource requests and limits for the User Operator.apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka spec: # Kafka and ZooKeeper sections... entityOperator: userOperator: resources: requests: cpu: "1" memory: 500Mi limits: cpu: "1" memory: 500Mi
Save the file and exit the editor. The Cluster Operator applies the changes automatically.
13.5. Configuring feature gates
AMQ Streams operators support feature gates to enable or disable certain features and functionality. Enabling a feature gate changes the behavior of the relevant operator and introduces the feature to your AMQ Streams deployment.
Feature gates have a default state of either enabled or disabled.
To modify a feature gate’s default state, use the STRIMZI_FEATURE_GATES
environment variable in the operator’s configuration. You can modify multiple feature gates using this single environment variable. Specify a comma-separated list of feature gate names and prefixes. A +
prefix enables the feature gate and a -
prefix disables it.
Example feature gate configuration that enables FeatureGate1
and disables FeatureGate2
env: - name: STRIMZI_FEATURE_GATES value: +FeatureGate1,-FeatureGate2
13.5.1. ControlPlaneListener feature gate
The ControlPlaneListener
feature gate has moved to GA, which means it is now permanently enabled and cannot be disabled. With ControlPlaneListener
enabled, the connections between the Kafka controller and brokers use an internal control plane listener on port 9090. Replication of data between brokers, as well as internal connections from AMQ Streams operators, Cruise Control, or the Kafka Exporter use the replication listener on port 9091.
With the ControlPlaneListener
feature gate permanently enabled, it is no longer possible to upgrade or downgrade directly between AMQ Streams 1.7 and earlier and AMQ Streams 2.3 and newer. You have to first upgrade or downgrade through one of the AMQ Streams versions in-between, disable the ControlPlaneListener
feature gate, and then downgrade or upgrade (with the feature gate enabled) to the target version.
13.5.2. ServiceAccountPatching feature gate
The ServiceAccountPatching
feature gate has moved to GA, which means it is now permanently enabled and cannot be disabled. With ServiceAccountPatching
enabled, the Cluster Operator always reconciles service accounts and updates them when needed. For example, when you change service account labels or annotations using the template
property of a custom resource, the operator automatically updates them on the existing service account resources.
13.5.3. UseStrimziPodSets feature gate
The UseStrimziPodSets
feature gate has a default state of enabled.
The UseStrimziPodSets
feature gate introduces a resource for managing pods called StrimziPodSet
. When the feature gate is enabled, this resource is used instead of the StatefulSets. AMQ Streams handles the creation and management of pods instead of OpenShift. Using StrimziPodSets instead of StatefulSets provides more control over the functionality.
When this feature gate is disabled, AMQ Streams relies on StatefulSets to create and manage pods for the ZooKeeper and Kafka clusters. AMQ Streams creates the StatefulSet and OpenShift creates the pods according to the StatefulSet definition. When a pod is deleted, OpenShift is responsible for recreating it. The use of StatefulSets has the following limitations:
- Pods are always created or removed based on their index numbers
- All pods in the StatefulSet need to have a similar configuration
- Changing storage configuration for the Pods in the StatefulSet is complicated
Disabling the UseStrimziPodSets feature gate
To disable the UseStrimziPodSets
feature gate, specify -UseStrimziPodSets
in the STRIMZI_FEATURE_GATES
environment variable in the Cluster Operator configuration.
The UseStrimziPodSets
feature gate must be disabled when downgrading to AMQ Streams 2.0 and earlier versions.
13.5.4. (Preview) UseKRaft feature gate
The UseKRaft
feature gate has a default state of disabled.
The UseKRaft
feature gate deploys the Kafka cluster in the KRaft (Kafka Raft metadata) mode without ZooKeeper. This feature gate is currently intended only for development and testing.
The KRaft mode is not ready for production in Apache Kafka or in AMQ Streams.
When the UseKRaft
feature gate is enabled, the Kafka cluster is deployed without ZooKeeper. The .spec.zookeeper
properties in the Kafka custom resource will be ignored, but still need to be present. The UseKRaft
feature gate provides an API that configures Kafka cluster nodes and their roles. The API is still in development and is expected to change before the KRaft mode is production-ready.
Currently, the KRaft mode in AMQ Streams has the following major limitations:
- Moving from Kafka clusters with ZooKeeper to KRaft clusters or the other way around is not supported.
- Upgrades and downgrades of Apache Kafka versions or the AMQ Streams operator are not supported. Users might need to delete the cluster, upgrade the operator and deploy a new Kafka cluster.
-
The Topic Operator is not supported. The
spec.entityOperator.topicOperator
property must be removed from theKafka
custom resource. - SCRAM-SHA-512 authentication is not supported.
-
JBOD storage is not supported. The
type: jbod
storage can be used, but the JBOD array can contain only one disk. -
All Kafka nodes have both the
controller
andbroker
KRaft roles. Kafka clusters with separatecontroller
andbroker
nodes are not supported.
Enabling the UseKRaft feature gate
To enable the UseKRaft
feature gate, specify +UseKRaft
in the STRIMZI_FEATURE_GATES
environment variable in the Cluster Operator configuration.
The UseKRaft
feature gate depends on the UseStrimziPodSets
feature gate. When enabling the UseKRaft
feature gate, make sure that the UseStrimziPodSets
feature gate is enabled as well.
13.5.5. (Preview) StableConnectIdentities feature gate
The StableConnectIdentities
feature gate has a default state of disabled.
The StableConnectIdentities
feature gate uses StrimziPodSet
resources to manage Kafka Connect and Kafka MirrorMaker 2 pods instead of using OpenShift Deployment
resources. StrimziPodSets
give the pods stable names and stable addresses, which do not change during rolling upgrades. This helps to minimize the number of rebalances of connector tasks.
Enabling the StableConnectIdentities
feature gate
To enable the StableConnectIdentities
feature gate, specify +StableConnectIdentities
in the STRIMZI_FEATURE_GATES
environment variable in the Cluster Operator configuration.
The StableConnectIdentities
feature gate must be disabled when downgrading to AMQ Streams 2.3 and earlier versions.
13.5.6. Feature gate releases
Feature gates have three stages of maturity:
- Alpha — typically disabled by default
- Beta — typically enabled by default
- General Availability (GA) — typically always enabled
Alpha stage features might be experimental or unstable, subject to change, or not sufficiently tested for production use. Beta stage features are well tested and their functionality is not likely to change. GA stage features are stable and should not change in the future. Alpha and beta stage features are removed if they do not prove to be useful.
-
The
ControlPlaneListener
feature gate moved to GA stage in AMQ Streams 2.3. It is now permanently enabled and cannot be disabled. -
The
ServiceAccountPatching
feature gate moved to GA stage in AMQ Streams 2.3. It is now permanently enabled and cannot be disabled. -
The
UseStrimziPodSets
feature gate moved to beta stage in AMQ Streams 2.3. It moves to GA in a future release of AMQ Streams when the support for StatefulSets is completely removed. -
The
UseKRaft
feature gate is available for development only and does not currently have a planned release for moving to the beta phase. -
The
StableConnectIdentities
feature gate is in alpha stage and is disabled by default.
Feature gates might be removed when they reach GA. This means that the feature was incorporated into the AMQ Streams core features and can no longer be disabled.
Feature gate | Alpha | Beta | GA |
---|---|---|---|
| 1.8 | 2.0 | 2.3 |
| 1.8 | 2.0 | 2.3 |
| 2.1 | 2.3 | future release (planned) |
| 2.2 | - | - |
| 2.4 | future release (planned) | - |
If a feature gate is enabled, you may need to disable it before upgrading or downgrading from a specific AMQ Streams version. The following table shows which feature gates you need to disable when upgrading or downgrading AMQ Streams versions.
Disable Feature gate | Upgrading from AMQ Streams version | Downgrading to AMQ Streams version |
---|---|---|
| 1.7 and earlier | 1.7 and earlier |
| - | 2.0 and earlier |
| - | 2.3 and earlier |
13.6. Monitoring operators using Prometheus metrics
AMQ Streams operators expose Prometheus metrics. The metrics are automatically enabled and contain information about the following:
- Number of reconciliations
- Number of Custom Resources the operator is processing
- Duration of reconciliations
- JVM metrics from the operators
Additionally, AMQ Streams provides an example Grafana dashboard for the operator.