Chapter 5. Using AMQ Streams Operators
Use the AMQ Streams operators to manage your Kafka cluster, and Kafka topics and users.
5.1. Using the Cluster Operator Copy linkLink copied to clipboard!
The Cluster Operator is used to deploy a Kafka cluster and other Kafka components.
The Cluster Operator is deployed using YAML installation files.
For information on deploying the Cluster Operator, see Deploying the Cluster Operator in the Deploying and Upgrading AMQ Streams on OpenShift guide.
For information on the deployment options available for Kafka, see Kafka Cluster configuration.
On OpenShift, a Kafka Connect deployment can incorporate a Source2Image feature to provide a convenient way to add additional connectors.
5.1.1. Cluster Operator configuration Copy linkLink copied to clipboard!
The Cluster Operator can be configured through the following supported environment variables and through the logging configuration.
STRIMZI_NAMESPACEA comma-separated list of namespaces that the operator should operate in. When not set, set to empty string, or to
*the Cluster Operator will operate in all namespaces. The Cluster Operator deployment might use the OpenShift Downward API to set this automatically to the namespace the Cluster Operator is deployed in. See the example below:env: - name: STRIMZI_NAMESPACE valueFrom: fieldRef: fieldPath: metadata.namespaceenv: - name: STRIMZI_NAMESPACE valueFrom: fieldRef: fieldPath: metadata.namespaceCopy to Clipboard Copied! Toggle word wrap Toggle overflow -
STRIMZI_FULL_RECONCILIATION_INTERVAL_MS - Optional, default is 120000 ms. The interval between periodic reconciliations, in milliseconds.
STRIMZI_OPERATION_TIMEOUT_MS- Optional, default 300000 ms. The timeout for internal operations, in milliseconds. This value should be increased when using AMQ Streams on clusters where regular OpenShift operations take longer than usual (because of slow downloading of Docker images, for example).
STRIMZI_KAFKA_IMAGES-
Required. This provides a mapping from Kafka version to the corresponding Docker image containing a Kafka broker of that version. The required syntax is whitespace or comma separated
<version>=<image>pairs. For example2.5.0=registry.redhat.io/amq7/amq-streams-kafka-25-rhel7:1.6.7, 2.6.0=registry.redhat.io/amq7/amq-streams-kafka-26-rhel7:1.6.7. This is used when aKafka.spec.kafka.versionproperty is specified but not theKafka.spec.kafka.image, as described in Section 2.1.18, “Container images”. STRIMZI_DEFAULT_KAFKA_INIT_IMAGE-
Optional, default
registry.redhat.io/amq7/amq-streams-rhel7-operator:1.6.7. The image name to use as default for the init container started before the broker for initial configuration work (that is, rack support), if no image is specified as thekafka-init-imagein the Section 2.1.18, “Container images”. STRIMZI_KAFKA_CONNECT_IMAGES-
Required. This provides a mapping from the Kafka version to the corresponding Docker image containing a Kafka connect of that version. The required syntax is whitespace or comma separated
<version>=<image>pairs. For example2.5.0=registry.redhat.io/amq7/amq-streams-kafka-25-rhel7:1.6.7, 2.6.0=registry.redhat.io/amq7/amq-streams-kafka-26-rhel7:1.6.7. This is used when aKafkaConnect.spec.versionproperty is specified but not theKafkaConnect.spec.image, as described in Section B.1.6, “image”. STRIMZI_KAFKA_CONNECT_S2I_IMAGES-
Required. This provides a mapping from the Kafka version to the corresponding Docker image containing a Kafka connect of that version. The required syntax is whitespace or comma separated
<version>=<image>pairs. For example2.5.0=registry.redhat.io/amq7/amq-streams-kafka-25-rhel7:1.6.7, 2.6.0=registry.redhat.io/amq7/amq-streams-kafka-26-rhel7:1.6.7. This is used when aKafkaConnectS2I.spec.versionproperty is specified but not theKafkaConnectS2I.spec.image, as described in Section B.1.6, “image”. STRIMZI_KAFKA_MIRROR_MAKER_IMAGES-
Required. This provides a mapping from the Kafka version to the corresponding Docker image containing a Kafka mirror maker of that version. The required syntax is whitespace or comma separated
<version>=<image>pairs. For example2.5.0=registry.redhat.io/amq7/amq-streams-kafka-25-rhel7:1.6.7, 2.6.0=registry.redhat.io/amq7/amq-streams-kafka-26-rhel7:1.6.7. This is used when aKafkaMirrorMaker.spec.versionproperty is specified but not theKafkaMirrorMaker.spec.image, as described in Section B.1.6, “image”. STRIMZI_DEFAULT_TOPIC_OPERATOR_IMAGE-
Optional, default
registry.redhat.io/amq7/amq-streams-rhel7-operator:1.6.7. The image name to use as the default when deploying the topic operator, if no image is specified as theKafka.spec.entityOperator.topicOperator.imagein the Section 2.1.18, “Container images” of theKafkaresource. STRIMZI_DEFAULT_USER_OPERATOR_IMAGE-
Optional, default
registry.redhat.io/amq7/amq-streams-rhel7-operator:1.6.7. The image name to use as the default when deploying the user operator, if no image is specified as theKafka.spec.entityOperator.userOperator.imagein the Section 2.1.18, “Container images” of theKafkaresource. STRIMZI_DEFAULT_TLS_SIDECAR_ENTITY_OPERATOR_IMAGE-
Optional, default
registry.redhat.io/amq7/amq-streams-kafka-26-rhel7:1.6.7. The image name to use as the default when deploying the sidecar container which provides TLS support for the Entity Operator, if no image is specified as theKafka.spec.entityOperator.tlsSidecar.imagein the Section 2.1.18, “Container images”. STRIMZI_IMAGE_PULL_POLICY-
Optional. The
ImagePullPolicywhich will be applied to containers in all pods managed by AMQ Streams Cluster Operator. The valid values areAlways,IfNotPresent, andNever. If not specified, the OpenShift defaults will be used. Changing the policy will result in a rolling update of all your Kafka, Kafka Connect, and Kafka MirrorMaker clusters. STRIMZI_IMAGE_PULL_SECRETS-
Optional. A comma-separated list of
Secretnames. The secrets referenced here contain the credentials to the container registries where the container images are pulled from. The secrets are used in theimagePullSecretsfield for allPodscreated by the Cluster Operator. Changing this list results in a rolling update of all your Kafka, Kafka Connect, and Kafka MirrorMaker clusters. STRIMZI_KUBERNETES_VERSIONOptional. Overrides the OpenShift version information detected from the API server. See the example below:
Copy to Clipboard Copied! Toggle word wrap Toggle overflow KUBERNETES_SERVICE_DNS_DOMAINOptional. Overrides the default OpenShift DNS domain name suffix.
By default, services assigned in the OpenShift cluster have a DNS domain name that uses the default suffix
cluster.local.For example, for broker kafka-0:
<cluster-name>-kafka-0.<cluster-name>-kafka-brokers.<namespace>.svc.cluster.local
<cluster-name>-kafka-0.<cluster-name>-kafka-brokers.<namespace>.svc.cluster.localCopy to Clipboard Copied! Toggle word wrap Toggle overflow The DNS domain name is added to the Kafka broker certificates used for hostname verification.
If you are using a different DNS domain name suffix in your cluster, change the
KUBERNETES_SERVICE_DNS_DOMAINenvironment variable from the default to the one you are using in order to establish a connection with the Kafka brokers.
Configuration by ConfigMap
The Cluster Operator’s logging is configured by the strimzi-cluster-operator ConfigMap.
A ConfigMap containing logging configuration is created when installing the Cluster Operator. This ConfigMap is described in the file install/cluster-operator/050-ConfigMap-strimzi-cluster-operator.yaml. You configure Cluster Operator logging by changing the data field log4j2.properties in this ConfigMap.
To update the logging configuration, you can edit the 050-ConfigMap-strimzi-cluster-operator.yaml file and then run the following command:
oc apply -f install/cluster-operator/050-ConfigMap-strimzi-cluster-operator.yaml
oc apply -f install/cluster-operator/050-ConfigMap-strimzi-cluster-operator.yaml
Alternatively, edit the ConfigMap directly:
oc edit cm strimzi-cluster-operator
oc edit cm strimzi-cluster-operator
To change the frequency of the reload interval, set a time in seconds in the monitorInterval option in the created ConfigMap.
If the ConfigMap is missing when the Cluster Operator is deployed, the default logging values are used.
If the ConfigMap is accidentally deleted after the Cluster Operator is deployed, the most recently loaded logging configuration is used. Create a new ConfigMap to load a new logging configuration.
Do not remove the monitorInterval option from the ConfigMap.
5.1.1.1. Periodic reconciliation Copy linkLink copied to clipboard!
Although the Cluster Operator reacts to all notifications about the desired cluster resources received from the OpenShift cluster, if the operator is not running, or if a notification is not received for any reason, the desired resources will get out of sync with the state of the running OpenShift cluster.
In order to handle failovers properly, a periodic reconciliation process is executed by the Cluster Operator so that it can compare the state of the desired resources with the current cluster deployments in order to have a consistent state across all of them. You can set the time interval for the periodic reconciliations using the [STRIMZI_FULL_RECONCILIATION_INTERVAL_MS] variable.
5.1.2. Provisioning Role-Based Access Control (RBAC) Copy linkLink copied to clipboard!
For the Cluster Operator to function it needs permission within the OpenShift cluster to interact with resources such as Kafka, KafkaConnect, and so on, as well as the managed resources, such as ConfigMaps, Pods, Deployments, StatefulSets and Services. Such permission is described in terms of OpenShift role-based access control (RBAC) resources:
-
ServiceAccount, -
RoleandClusterRole, -
RoleBindingandClusterRoleBinding.
In addition to running under its own ServiceAccount with a ClusterRoleBinding, the Cluster Operator manages some RBAC resources for the components that need access to OpenShift resources.
OpenShift also includes privilege escalation protections that prevent components operating under one ServiceAccount from granting other ServiceAccounts privileges that the granting ServiceAccount does not have. Because the Cluster Operator must be able to create the ClusterRoleBindings, and RoleBindings needed by resources it manages, the Cluster Operator must also have those same privileges.
5.1.2.1. Delegated privileges Copy linkLink copied to clipboard!
When the Cluster Operator deploys resources for a desired Kafka resource it also creates ServiceAccounts, RoleBindings, and ClusterRoleBindings, as follows:
The Kafka broker pods use a
ServiceAccountcalledcluster-name-kafka-
When the rack feature is used, the
strimzi-cluster-name-kafka-initClusterRoleBindingis used to grant thisServiceAccountaccess to the nodes within the cluster via aClusterRolecalledstrimzi-kafka-broker - When the rack feature is not used no binding is created
-
When the rack feature is used, the
-
The ZooKeeper pods use a
ServiceAccountcalledcluster-name-zookeeper The Entity Operator pod uses a
ServiceAccountcalledcluster-name-entity-operator-
The Topic Operator produces OpenShift events with status information, so the
ServiceAccountis bound to aClusterRolecalledstrimzi-entity-operatorwhich grants this access via thestrimzi-entity-operatorRoleBinding
-
The Topic Operator produces OpenShift events with status information, so the
-
The pods for
KafkaConnectandKafkaConnectS2Iresources use aServiceAccountcalledcluster-name-cluster-connect -
The pods for
KafkaMirrorMakeruse aServiceAccountcalledcluster-name-mirror-maker -
The pods for
KafkaMirrorMaker2use aServiceAccountcalledcluster-name-mirrormaker2 -
The pods for
KafkaBridgeuse aServiceAccountcalledcluster-name-bridge
5.1.2.2. ServiceAccount Copy linkLink copied to clipboard!
The Cluster Operator is best run using a ServiceAccount:
Example ServiceAccount for the Cluster Operator
The Deployment of the operator then needs to specify this in its spec.template.spec.serviceAccountName:
Partial example of Deployment for the Cluster Operator
Note line 12, where the strimzi-cluster-operator ServiceAccount is specified as the serviceAccountName.
5.1.2.3. ClusterRoles Copy linkLink copied to clipboard!
The Cluster Operator needs to operate using ClusterRoles that gives access to the necessary resources. Depending on the OpenShift cluster setup, a cluster administrator might be needed to create the ClusterRoles.
Cluster administrator rights are only needed for the creation of the ClusterRoles. The Cluster Operator will not run under the cluster admin account.
The ClusterRoles follow the principle of least privilege and contain only those privileges needed by the Cluster Operator to operate Kafka, Kafka Connect, and ZooKeeper clusters. The first set of assigned privileges allow the Cluster Operator to manage OpenShift resources such as StatefulSets, Deployments, Pods, and ConfigMaps.
Cluster Operator uses ClusterRoles to grant permission at the namespace-scoped resources level and cluster-scoped resources level:
ClusterRole with namespaced resources for the Cluster Operator
The second includes the permissions needed for cluster-scoped resources.
ClusterRole with cluster-scoped resources for the Cluster Operator
The strimzi-kafka-broker ClusterRole represents the access needed by the init container in Kafka pods that is used for the rack feature. As described in the Delegated privileges section, this role is also needed by the Cluster Operator in order to be able to delegate this access.
ClusterRole for the Cluster Operator allowing it to delegate access to OpenShift nodes to the Kafka broker pods
The strimzi-topic-operator ClusterRole represents the access needed by the Topic Operator. As described in the Delegated privileges section, this role is also needed by the Cluster Operator in order to be able to delegate this access.
ClusterRole for the Cluster Operator allowing it to delegate access to events to the Topic Operator
The strimzi-kafka-client ClusterRole represents the access needed by the components based on Kafka clients which use the client rack-awareness. As described in the Delegated privileges section, this role is also needed by the Cluster Operator in order to be able to delegate this access.
ClusterRole for the Cluster Operator allowing it to delegate access to OpenShift nodes to the Kafka client based pods
5.1.2.4. ClusterRoleBindings Copy linkLink copied to clipboard!
The operator needs ClusterRoleBindings and RoleBindings which associates its ClusterRole with its ServiceAccount: ClusterRoleBindings are needed for ClusterRoles containing cluster-scoped resources.
Example ClusterRoleBinding for the Cluster Operator
ClusterRoleBindings are also needed for the ClusterRoles needed for delegation:
Example ClusterRoleBinding for the Cluster Operator for the Kafka broker rack-awarness
and
Example ClusterRoleBinding for the Cluster Operator for the Kafka client rack-awarness
ClusterRoles containing only namespaced resources are bound using RoleBindings only.
5.2. Using the Topic Operator Copy linkLink copied to clipboard!
When you create, modify or delete a topic using the KafkaTopic resource, the Topic Operator ensures those changes are reflected in the Kafka cluster.
The Deploying and Upgrading AMQ Streams on OpenShift guide provides instructions to deploy the Topic Operator:
5.2.1. Kafka topic resource Copy linkLink copied to clipboard!
The KafkaTopic resource is used to configure topics, including the number of partitions and replicas.
The full schema for KafkaTopic is described in KafkaTopic schema reference.
5.2.1.1. Identifying a Kafka cluster for topic handling Copy linkLink copied to clipboard!
A KafkaTopic resource includes a label that defines the appropriate name of the Kafka cluster (derived from the name of the Kafka resource) to which it belongs.
For example:
The label is used by the Topic Operator to identify the KafkaTopic resource and create a new topic, and also in subsequent handling of the topic.
If the label does not match the Kafka cluster, the Topic Operator cannot identify the KafkaTopic and the topic is not created.
5.2.1.2. Handling changes to topics Copy linkLink copied to clipboard!
A fundamental problem that the Topic Operator has to solve is that there is no single source of truth: Both the KafkaTopic resource and the Kafka topic can be modified independently of the operator. Complicating this, the Topic Operator might not always be able to observe changes at each end in real time (for example, the operator might be down).
To resolve this, the operator maintains its own private copy of the information about each topic. When a change happens either in the Kafka cluster, or in OpenShift, it looks at both the state of the other system and at its private copy in order to determine what needs to change to keep everything in sync. The same thing happens whenever the operator starts, and periodically while it is running.
For example, suppose the Topic Operator is not running, and a KafkaTopic my-topic gets created. When the operator starts it will lack a private copy of "my-topic", so it can infer that the KafkaTopic has been created since it was last running. The operator will create the topic corresponding to my-topic, and also store a private copy of the metadata for my-topic.
The private copy allows the operator to cope with scenarios where the topic configuration gets changed both in Kafka and in OpenShift, so long as the changes are not incompatible (for example, both changing the same topic config key, but to different values). In the case of incompatible changes, the Kafka configuration wins, and the KafkaTopic will be updated to reflect that.
The private copy is held in the same ZooKeeper ensemble used by Kafka itself. This mitigates availability concerns, because if ZooKeeper is not running then Kafka itself cannot run, so the operator will be no less available than it would even if it was stateless.
5.2.1.3. Kafka topic usage recommendations Copy linkLink copied to clipboard!
When working with topics, be consistent. Always operate on either KafkaTopic resources or topics directly in OpenShift. Avoid routinely switching between both methods for a given topic.
Use topic names that reflect the nature of the topic, and remember that names cannot be changed later.
If creating a topic in Kafka, use a name that is a valid OpenShift resource name, otherwise the Topic Operator will need to create the corresponding KafkaTopic with a name that conforms to the OpenShift rules.
Recommendations for identifiers and names in OpenShift are outlined in Identifiers and Names in OpenShift community article.
5.2.1.4. Kafka topic naming conventions Copy linkLink copied to clipboard!
Kafka and OpenShift impose their own validation rules for the naming of topics in Kafka and KafkaTopic.metadata.name respectively. There are valid names for each which are invalid in the other.
Using the spec.topicName property, it is possible to create a valid topic in Kafka with a name that would be invalid for the Kafka topic in OpenShift.
The spec.topicName property inherits Kafka naming validation rules:
- The name must not be longer than 249 characters.
-
Valid characters for Kafka topics are ASCII alphanumerics,
.,_, and-. -
The name cannot be
.or.., though.can be used in a name, such asexampleTopic.or.exampleTopic.
spec.topicName must not be changed.
For example:
- 1
- Upper case is invalid in OpenShift.
cannot be changed to:
Some Kafka client applications, such as Kafka Streams, can create topics in Kafka programmatically. If those topics have names that are invalid OpenShift resource names, the Topic Operator gives them valid names based on the Kafka names. Invalid characters are replaced and a hash is appended to the name.
5.2.2. Configuring a Kafka topic Copy linkLink copied to clipboard!
Use the properties of the KafkaTopic resource to configure a Kafka topic.
You can use oc apply to create or modify topics, and oc delete to delete existing topics.
For example:
-
oc apply -f <topic-config-file> -
oc delete KafkaTopic <topic-name>
This procedure shows how to create a topic with 10 partitions and 2 replicas.
Before you start
It is important that you consider the following before making your changes:
Kafka does not support making the following changes through the
KafkaTopicresource:-
Changing topic names using
spec.topicName -
Decreasing partition size using
spec.partitions
-
Changing topic names using
-
You cannot use
spec.replicasto change the number of replicas that were initially specified. -
Increasing
spec.partitionsfor topics with keys will change how records are partitioned, which can be particularly problematic when the topic uses semantic partitioning.
Prerequisites
- A running Kafka cluster configured with a Kafka broker listener using TLS authentication and encryption.
- A running Topic Operator (typically deployed with the Entity Operator).
-
For deleting a topic,
delete.topic.enable=true(default) in thespec.kafka.configof theKafkaresource.
Procedure
Prepare a file containing the
KafkaTopicto be created.An example
KafkaTopicCopy to Clipboard Copied! Toggle word wrap Toggle overflow TipWhen modifying a topic, you can get the current version of the resource using
oc get kafkatopic orders -o yaml.Create the
KafkaTopicresource in OpenShift.oc apply -f TOPIC-CONFIG-FILE
oc apply -f TOPIC-CONFIG-FILECopy to Clipboard Copied! Toggle word wrap Toggle overflow
5.2.3. Configuring the Topic Operator with resource requests and limits Copy linkLink copied to clipboard!
You can allocate resources, such as CPU and memory, to the Topic Operator and set a limit on the amount of resources it can consume.
Prerequisites
- The Cluster Operator is running.
Procedure
Update the Kafka cluster configuration in an editor, as required:
oc edit kafka MY-CLUSTER
oc edit kafka MY-CLUSTERCopy to Clipboard Copied! Toggle word wrap Toggle overflow In the
spec.entityOperator.topicOperator.resourcesproperty in theKafkaresource, set the resource requests and limits for the Topic Operator.Copy to Clipboard Copied! Toggle word wrap Toggle overflow Apply the new configuration to create or update the resource.
oc apply -f KAFKA-CONFIG-FILE
oc apply -f KAFKA-CONFIG-FILECopy to Clipboard Copied! Toggle word wrap Toggle overflow
5.3. Using the User Operator Copy linkLink copied to clipboard!
When you create, modify or delete a user using the KafkaUser resource, the User Operator ensures those changes are reflected in the Kafka cluster.
The Deploying and Upgrading AMQ Streams on OpenShift guide provides instructions to deploy the User Operator:
For more information about the schema, see KafkaUser schema reference.
Authenticating and authorizing access to Kafka
Use KafkaUser to enable the authentication and authorization mechanisms that a specific client uses to access Kafka.
For more information on using KafkUser to manage users and secure access to Kafka brokers, see Securing access to Kafka brokers.
5.3.1. Configuring the User Operator with resource requests and limits Copy linkLink copied to clipboard!
You can allocate resources, such as CPU and memory, to the User Operator and set a limit on the amount of resources it can consume.
Prerequisites
- The Cluster Operator is running.
Procedure
Update the Kafka cluster configuration in an editor, as required:
oc edit kafka MY-CLUSTER
oc edit kafka MY-CLUSTERCopy to Clipboard Copied! Toggle word wrap Toggle overflow In the
spec.entityOperator.userOperator.resourcesproperty in theKafkaresource, set the resource requests and limits for the User Operator.Copy to Clipboard Copied! Toggle word wrap Toggle overflow Save the file and exit the editor. The Cluster Operator applies the changes automatically.
5.4. Monitoring operators using Prometheus metrics Copy linkLink copied to clipboard!
AMQ Streams operators expose Prometheus metrics. The metrics are automatically enabled and contain information about:
- Number of reconciliations
- Number of Custom Resources the operator is processing
- Duration of reconciliations
- JVM metrics from the operators
Additionally, we provide an example Grafana dashboard.
For more information about Prometheus, see the Introducing Metrics to Kafka in the Deploying and Upgrading AMQ Streams on OpenShift guide.