Chapter 5. Using AMQ Streams Operators
Use the AMQ Streams operators to manage your Kafka cluster, and Kafka topics and users.
5.1. Using the Cluster Operator Copy linkLink copied to clipboard!
The Cluster Operator is used to deploy a Kafka cluster and other Kafka components.
The Cluster Operator is deployed using YAML installation files.
On OpenShift, a Kafka Connect deployment can incorporate a Source2Image feature to provide a convenient way to add additional connectors.
5.1.1. Cluster Operator configuration Copy linkLink copied to clipboard!
You can configure the Cluster Operator using supported environment variables, and through its logging configuration.
The environment variables relate to container configuration for the deployment of the Cluster Operator image. For more information on image
configuration, see, Section 13.1.6, “image
”.
STRIMZI_NAMESPACE
A comma-separated list of namespaces that the operator should operate in. When not set, set to empty string, or set to
*
, the Cluster Operator will operate in all namespaces. The Cluster Operator deployment might use the OpenShift Downward API to set this automatically to the namespace the Cluster Operator is deployed in.Example configuration for Cluster Operator namespaces
env: - name: STRIMZI_NAMESPACE valueFrom: fieldRef: fieldPath: metadata.namespace
env: - name: STRIMZI_NAMESPACE valueFrom: fieldRef: fieldPath: metadata.namespace
Copy to Clipboard Copied! Toggle word wrap Toggle overflow -
STRIMZI_FULL_RECONCILIATION_INTERVAL_MS
- Optional, default is 120000 ms. The interval between periodic reconciliations, in milliseconds.
STRIMZI_OPERATION_TIMEOUT_MS
- Optional, default 300000 ms. The timeout for internal operations, in milliseconds. This value should be increased when using AMQ Streams on clusters where regular OpenShift operations take longer than usual (because of slow downloading of Docker images, for example).
STRIMZI_OPERATIONS_THREAD_POOL_SIZE
- Optional, default 10 The worker thread pool size, which is used for various asynchronous and blocking operations that are run by the cluster operator.
STRIMZI_OPERATOR_NAMESPACE
The name of the namespace where the AMQ Streams Cluster Operator is running. Do not configure this variable manually. Use the OpenShift Downward API.
env: - name: STRIMZI_OPERATOR_NAMESPACE valueFrom: fieldRef: fieldPath: metadata.namespace
env: - name: STRIMZI_OPERATOR_NAMESPACE valueFrom: fieldRef: fieldPath: metadata.namespace
Copy to Clipboard Copied! Toggle word wrap Toggle overflow STRIMZI_OPERATOR_NAMESPACE_LABELS
Optional. The labels of the namespace where the AMQ Streams Cluster Operator is running. Namespace labels are used to configure the namespace selector in network policies to allow the AMQ Streams Cluster Operator to only have access to the operands from the namespace with these labels. When not set, the namespace selector in network policies is configured to allow access to the AMQ Streams Cluster Operator from any namespace in the OpenShift cluster.
env: - name: STRIMZI_OPERATOR_NAMESPACE_LABELS value: label1=value1,label2=value2
env: - name: STRIMZI_OPERATOR_NAMESPACE_LABELS value: label1=value1,label2=value2
Copy to Clipboard Copied! Toggle word wrap Toggle overflow STRIMZI_CUSTOM_RESOURCE_SELECTOR
Optional. Specifies label selector used to filter the custom resources handled by the operator. The operator will operate only on those custom resources which will have the specified labels set. Resources without these labels will not be seen by the operator. The label selector applies to
Kafka
,KafkaConnect
,KafkaConnectS2I
,KafkaBridge
,KafkaMirrorMaker
, andKafkaMirrorMaker2
resources.KafkaRebalance
andKafkaConnector
resources will be operated only when their corresponding Kafka and Kafka Connect clusters have the matching labels.env: - name: STRIMZI_CUSTOM_RESOURCE_SELECTOR value: label1=value1,label2=value2
env: - name: STRIMZI_CUSTOM_RESOURCE_SELECTOR value: label1=value1,label2=value2
Copy to Clipboard Copied! Toggle word wrap Toggle overflow STRIMZI_LABELS_EXCLUSION_PATTERN
Optional, default regex pattern is
^app.kubernetes.io/(?!part-of).*
. Specifies regex exclusion pattern used to filter labels propagation from the main custom resource to its subresources. The labels exclusion filter is not applied to labels in template sections such asspec.kafka.template.pod.metadata.labels
.env: - name: STRIMZI_LABELS_EXCLUSION_PATTERN value: "^key1.*"
env: - name: STRIMZI_LABELS_EXCLUSION_PATTERN value: "^key1.*"
Copy to Clipboard Copied! Toggle word wrap Toggle overflow STRIMZI_KAFKA_IMAGES
-
Required. This provides a mapping from Kafka version to the corresponding Docker image containing a Kafka broker of that version. The required syntax is whitespace or comma separated
<version>=<image>
pairs. For example2.7.0=registry.redhat.io/amq7/amq-streams-kafka-27-rhel8:1.8.4, 2.8.0=registry.redhat.io/amq7/amq-streams-kafka-28-rhel8:1.8.4
. This is used when aKafka.spec.kafka.version
property is specified but not theKafka.spec.kafka.image
in theKafka
resource. STRIMZI_DEFAULT_KAFKA_INIT_IMAGE
-
Optional, default
registry.redhat.io/amq7/amq-streams-rhel8-operator:1.8.4
. The image name to use as default for the init container started before the broker for initial configuration work (that is, rack support), if no image is specified as thekafka-init-image
in theKafka
resource. STRIMZI_KAFKA_CONNECT_IMAGES
-
Required. This provides a mapping from the Kafka version to the corresponding Docker image containing a Kafka connect of that version. The required syntax is whitespace or comma separated
<version>=<image>
pairs. For example2.7.0=registry.redhat.io/amq7/amq-streams-kafka-27-rhel8:1.8.4, 2.8.0=registry.redhat.io/amq7/amq-streams-kafka-28-rhel8:1.8.4
. This is used when aKafkaConnect.spec.version
property is specified but not theKafkaConnect.spec.image
. STRIMZI_KAFKA_CONNECT_S2I_IMAGES
-
Required. This provides a mapping from the Kafka version to the corresponding Docker image containing a Kafka connect of that version. The required syntax is whitespace or comma separated
<version>=<image>
pairs. For example2.7.0=registry.redhat.io/amq7/amq-streams-kafka-27-rhel8:1.8.4, 2.8.0=registry.redhat.io/amq7/amq-streams-kafka-28-rhel8:1.8.4
. This is used when aKafkaConnectS2I.spec.version
property is specified but not theKafkaConnectS2I.spec.image
. STRIMZI_KAFKA_MIRROR_MAKER_IMAGES
-
Required. This provides a mapping from the Kafka version to the corresponding Docker image containing a Kafka mirror maker of that version. The required syntax is whitespace or comma separated
<version>=<image>
pairs. For example2.7.0=registry.redhat.io/amq7/amq-streams-kafka-27-rhel8:1.8.4, 2.8.0=registry.redhat.io/amq7/amq-streams-kafka-28-rhel8:1.8.4
. This is used when aKafkaMirrorMaker.spec.version
property is specified but not theKafkaMirrorMaker.spec.image
. STRIMZI_DEFAULT_TOPIC_OPERATOR_IMAGE
-
Optional, default
registry.redhat.io/amq7/amq-streams-rhel8-operator:1.8.4
. The image name to use as the default when deploying the topic operator, if no image is specified as theKafka.spec.entityOperator.topicOperator.image
inKafka
resource. STRIMZI_DEFAULT_USER_OPERATOR_IMAGE
-
Optional, default
registry.redhat.io/amq7/amq-streams-rhel8-operator:1.8.4
. The image name to use as the default when deploying the user operator, if no image is specified as theKafka.spec.entityOperator.userOperator.image
in theKafka
resource. STRIMZI_DEFAULT_TLS_SIDECAR_ENTITY_OPERATOR_IMAGE
-
Optional, default
registry.redhat.io/amq7/amq-streams-kafka-28-rhel8:1.8.4
. The image name to use as the default when deploying the sidecar container which provides TLS support for the Entity Operator, if no image is specified as theKafka.spec.entityOperator.tlsSidecar.image
in theKafka
resource. STRIMZI_IMAGE_PULL_POLICY
-
Optional. The
ImagePullPolicy
which will be applied to containers in all pods managed by AMQ Streams Cluster Operator. The valid values areAlways
,IfNotPresent
, andNever
. If not specified, the OpenShift defaults will be used. Changing the policy will result in a rolling update of all your Kafka, Kafka Connect, and Kafka MirrorMaker clusters. STRIMZI_IMAGE_PULL_SECRETS
-
Optional. A comma-separated list of
Secret
names. The secrets referenced here contain the credentials to the container registries where the container images are pulled from. The secrets are used in theimagePullSecrets
field for allPods
created by the Cluster Operator. Changing this list results in a rolling update of all your Kafka, Kafka Connect, and Kafka MirrorMaker clusters. STRIMZI_KUBERNETES_VERSION
Optional. Overrides the Kubernetes version information detected from the API server.
Example configuration for Kubernetes version override
Copy to Clipboard Copied! Toggle word wrap Toggle overflow KUBERNETES_SERVICE_DNS_DOMAIN
Optional. Overrides the default OpenShift DNS domain name suffix.
By default, services assigned in the OpenShift cluster have a DNS domain name that uses the default suffix
cluster.local
.For example, for broker kafka-0:
<cluster-name>-kafka-0.<cluster-name>-kafka-brokers.<namespace>.svc.cluster.local
<cluster-name>-kafka-0.<cluster-name>-kafka-brokers.<namespace>.svc.cluster.local
Copy to Clipboard Copied! Toggle word wrap Toggle overflow The DNS domain name is added to the Kafka broker certificates used for hostname verification.
If you are using a different DNS domain name suffix in your cluster, change the
KUBERNETES_SERVICE_DNS_DOMAIN
environment variable from the default to the one you are using in order to establish a connection with the Kafka brokers.STRIMZI_CONNECT_BUILD_TIMEOUT_MS
- Optional, default 300000 ms. The timeout for building new Kafka Connect images with additional connectots, in milliseconds. This value should be increased when using AMQ Streams to build container images containing many connectors or using a slow container registry.
STRIMZI_FEATURE_GATES
- Optional. Enables or disables features and functionality controlled by feature gates. For more information about each feature gate, see Section 5.1.1.1, “Feature gates”.
5.1.1.1. Feature gates Copy linkLink copied to clipboard!
AMQ Streams operators support feature gates to enable or disable certain features and functionality. Enabling a feature gate changes the behavior of the relevant operator and introduces the feature to your AMQ Streams deployment.
Feature gates have a default state of either enabled or disabled. To modify a feature gate’s default state, use the STRIMZI_FEATURE_GATES
environment variable in the operator’s configuration. You can modify multiple feature gates using this single environment variable.
Feature gates have three stages of maturity:
- Alpha — typically disabled by default
- Beta — typically enabled by default
- General Availability (GA) — typically enabled by default
Alpha stage features might be experimental or unstable, subject to change, or not sufficiently tested for production use. Beta stage features are well tested and their functionality is not likely to change. GA stage features are stable and should not change in future. Alpha and beta stage features are removed if they do not prove to be useful.
Feature gates might be removed when they reach GA. This means that the feature was incorporated into the AMQ Streams core features and can no longer be disabled.
Feature gate | Alpha | Beta | GA |
---|---|---|---|
| 1.8 | - | - |
| 1.8 | - | - |
Configuring feature gates
You configure feature gates using the STRIMZI_FEATURE_GATES
environment variable in the operator’s configuration. Specify a comma-separated list of feature gate names and prefixes. A +
prefix enables the feature gate and a -
prefix disables it.
Example feature gate configuration that enables FeatureGate1
and disables FeatureGate2
env: - name: STRIMZI_FEATURE_GATES value: +FeatureGate1,-FeatureGate2
env:
- name: STRIMZI_FEATURE_GATES
value: +FeatureGate1,-FeatureGate2
5.1.1.1.1. Control plane listener feature gate Copy linkLink copied to clipboard!
Use the ControlPlaneListener
feature gate to change the communication paths used for inter-broker communications within your Kafka cluster.
The OpenShift control plane manages the workloads running on the worker nodes. Services such as the Kubernetes API server and the controller manager run on the control plane. The OpenShift data plane provides resources to containers, including CPU, memory, network, and storage.
In AMQ Streams, control plane traffic consists of controller connections that maintain the desired state of the Kafka cluster. Data plane traffic mainly consists of data replication between the leader broker and the follower brokers.
When the ControlPlaneListener
feature gate is disabled, control plane and data plane traffic go through the same internal listener on port 9091. This was the default behavior before the feature gate was introduced.
When ControlPlaneListener
is enabled, control plane traffic goes through a dedicated control plane listener on port 9090. Data plane traffic continues to use the internal listener on port 9091.
Using control plane listeners might improve performance because important controller connections, such as partition leadership changes, are not delayed by data replication across brokers.
Enabling the control plane listener feature gate
The ControlPlaneListener
feature gate is in the alpha stage and has a default state of disabled. To enable it, specify +ControlPlaneListener
in the STRIMZI_FEATURE_GATES
environment variable in the Cluster Operator configuration.
This feature gate must be disabled when:
- Upgrading from AMQ Streams 1.7 and earlier
- Downgrading to AMQ Streams 1.7 and earlier
The ControlPlaneListener
feature gate was introduced in AMQ Streams 1.8 and is expected to remain in the alpha stage for a number of releases before moving to the beta stage.
5.1.1.1.2. Service Account patching feature gate Copy linkLink copied to clipboard!
By default, the Cluster Operator does not update service accounts. To allow the Cluster Operator to apply updates, enable the ServiceAccountPatching
feature gate.
Add +ServiceAccountPatching
to the STRIMZI_FEATURE_GATES
environment variable in the Cluster Operator configuration.
The feature gate is currently in the alpha phase and disabled by default. With the feature gate enabled, the Cluster Operator applies updates to service account configuration in every reconciliation. For example, you can change service account labels and annotations after the operands are already created.
The ServiceAccountPatching
feature gate was introduced in AMQ Streams 1.8 and is expected to remain in the alpha phase for a number of releases before it moves to the beta phase and is enabled by default.
5.1.1.2. Logging configuration by ConfigMap Copy linkLink copied to clipboard!
The Cluster Operator’s logging is configured by the strimzi-cluster-operator
ConfigMap
.
A ConfigMap
containing logging configuration is created when installing the Cluster Operator. This ConfigMap
is described in the file install/cluster-operator/050-ConfigMap-strimzi-cluster-operator.yaml
. You configure Cluster Operator logging by changing the data field log4j2.properties
in this ConfigMap
.
To update the logging configuration, you can edit the 050-ConfigMap-strimzi-cluster-operator.yaml
file and then run the following command:
oc create -f install/cluster-operator/050-ConfigMap-strimzi-cluster-operator.yaml
oc create -f install/cluster-operator/050-ConfigMap-strimzi-cluster-operator.yaml
Alternatively, edit the ConfigMap
directly:
oc edit configmap strimzi-cluster-operator
oc edit configmap strimzi-cluster-operator
To change the frequency of the reload interval, set a time in seconds in the monitorInterval
option in the created ConfigMap
.
If the ConfigMap
is missing when the Cluster Operator is deployed, the default logging values are used.
If the ConfigMap
is accidentally deleted after the Cluster Operator is deployed, the most recently loaded logging configuration is used. Create a new ConfigMap
to load a new logging configuration.
Do not remove the monitorInterval
option from the ConfigMap.
5.1.1.3. Restricting Cluster Operator access with network policy Copy linkLink copied to clipboard!
The Cluster Operator can run in the same namespace as the resources it manages, or in a separate namespace. By default, the STRIMZI_OPERATOR_NAMESPACE
environment variable is configured to use the OpenShift Downward API to find which namespace the Cluster Operator is running in. If the Cluster Operator is running in the same namespace as the resources, only local access is required, and allowed by AMQ Streams.
If the Cluster Operator is running in a separate namespace to the resources it manages, any namespace in the OpenShift cluster is allowed access to the Cluster Operator unless network policy is configured. Use the optional STRIMZI_OPERATOR_NAMESPACE_LABELS
environment variable to establish network policy for the Cluster Operator using namespace labels. By adding namespace labels, access to the Cluster Operator is restricted to the namespaces specified.
Network policy configured for the Cluster Operator deployment
5.1.1.4. Periodic reconciliation Copy linkLink copied to clipboard!
Although the Cluster Operator reacts to all notifications about the desired cluster resources received from the OpenShift cluster, if the operator is not running, or if a notification is not received for any reason, the desired resources will get out of sync with the state of the running OpenShift cluster.
In order to handle failovers properly, a periodic reconciliation process is executed by the Cluster Operator so that it can compare the state of the desired resources with the current cluster deployments in order to have a consistent state across all of them. You can set the time interval for the periodic reconciliations using the [STRIMZI_FULL_RECONCILIATION_INTERVAL_MS] variable.
5.1.2. Provisioning Role-Based Access Control (RBAC) Copy linkLink copied to clipboard!
For the Cluster Operator to function it needs permission within the OpenShift cluster to interact with resources such as Kafka
, KafkaConnect
, and so on, as well as the managed resources, such as ConfigMaps
, Pods
, Deployments
, StatefulSets
and Services
. Such permission is described in terms of OpenShift role-based access control (RBAC) resources:
-
ServiceAccount
, -
Role
andClusterRole
, -
RoleBinding
andClusterRoleBinding
.
In addition to running under its own ServiceAccount
with a ClusterRoleBinding
, the Cluster Operator manages some RBAC resources for the components that need access to OpenShift resources.
OpenShift also includes privilege escalation protections that prevent components operating under one ServiceAccount
from granting other ServiceAccounts
privileges that the granting ServiceAccount
does not have. Because the Cluster Operator must be able to create the ClusterRoleBindings
, and RoleBindings
needed by resources it manages, the Cluster Operator must also have those same privileges.
5.1.2.1. Delegated privileges Copy linkLink copied to clipboard!
When the Cluster Operator deploys resources for a desired Kafka
resource it also creates ServiceAccounts
, RoleBindings
, and ClusterRoleBindings
, as follows:
The Kafka broker pods use a
ServiceAccount
calledcluster-name-kafka
-
When the rack feature is used, the
strimzi-cluster-name-kafka-init
ClusterRoleBinding
is used to grant thisServiceAccount
access to the nodes within the cluster via aClusterRole
calledstrimzi-kafka-broker
- When the rack feature is not used and the cluster is not exposed via nodeport, no binding is created
-
When the rack feature is used, the
-
The ZooKeeper pods use a
ServiceAccount
calledcluster-name-zookeeper
The Entity Operator pod uses a
ServiceAccount
calledcluster-name-entity-operator
-
The Topic Operator produces OpenShift events with status information, so the
ServiceAccount
is bound to aClusterRole
calledstrimzi-entity-operator
which grants this access via thestrimzi-entity-operator
RoleBinding
-
The Topic Operator produces OpenShift events with status information, so the
-
The pods for
KafkaConnect
andKafkaConnectS2I
resources use aServiceAccount
calledcluster-name-cluster-connect
-
The pods for
KafkaMirrorMaker
use aServiceAccount
calledcluster-name-mirror-maker
-
The pods for
KafkaMirrorMaker2
use aServiceAccount
calledcluster-name-mirrormaker2
-
The pods for
KafkaBridge
use aServiceAccount
calledcluster-name-bridge
5.1.2.2. ServiceAccount Copy linkLink copied to clipboard!
The Cluster Operator is best run using a ServiceAccount
:
Example ServiceAccount
for the Cluster Operator
The Deployment
of the operator then needs to specify this in its spec.template.spec.serviceAccountName
:
Partial example of Deployment
for the Cluster Operator
Note line 12, where the strimzi-cluster-operator
ServiceAccount
is specified as the serviceAccountName
.
5.1.2.3. ClusterRoles Copy linkLink copied to clipboard!
The Cluster Operator needs to operate using ClusterRoles
that gives access to the necessary resources. Depending on the OpenShift cluster setup, a cluster administrator might be needed to create the ClusterRoles
.
Cluster administrator rights are only needed for the creation of the ClusterRoles
. The Cluster Operator will not run under the cluster admin account.
The ClusterRoles
follow the principle of least privilege and contain only those privileges needed by the Cluster Operator to operate Kafka, Kafka Connect, and ZooKeeper clusters. The first set of assigned privileges allow the Cluster Operator to manage OpenShift resources such as StatefulSets
, Deployments
, Pods
, and ConfigMaps
.
Cluster Operator uses ClusterRoles to grant permission at the namespace-scoped resources level and cluster-scoped resources level:
ClusterRole
with namespaced resources for the Cluster Operator
The second includes the permissions needed for cluster-scoped resources.
ClusterRole
with cluster-scoped resources for the Cluster Operator
The strimzi-kafka-broker
ClusterRole
represents the access needed by the init container in Kafka pods that is used for the rack feature. As described in the Delegated privileges section, this role is also needed by the Cluster Operator in order to be able to delegate this access.
ClusterRole
for the Cluster Operator allowing it to delegate access to OpenShift nodes to the Kafka broker pods
The strimzi-topic-operator
ClusterRole
represents the access needed by the Topic Operator. As described in the Delegated privileges section, this role is also needed by the Cluster Operator in order to be able to delegate this access.
ClusterRole
for the Cluster Operator allowing it to delegate access to events to the Topic Operator
The strimzi-kafka-client
ClusterRole
represents the access needed by the components based on Kafka clients which use the client rack-awareness. As described in the Delegated privileges section, this role is also needed by the Cluster Operator in order to be able to delegate this access.
ClusterRole
for the Cluster Operator allowing it to delegate access to OpenShift nodes to the Kafka client based pods
5.1.2.4. ClusterRoleBindings Copy linkLink copied to clipboard!
The operator needs ClusterRoleBindings
and RoleBindings
which associates its ClusterRole
with its ServiceAccount
: ClusterRoleBindings
are needed for ClusterRoles
containing cluster-scoped resources.
Example ClusterRoleBinding
for the Cluster Operator
ClusterRoleBindings
are also needed for the ClusterRoles
needed for delegation:
Example ClusterRoleBinding
for the Cluster Operator for the Kafka broker rack-awareness
and
Example ClusterRoleBinding
for the Cluster Operator for the Kafka client rack-awareness
ClusterRoles
containing only namespaced resources are bound using RoleBindings
only.
5.1.3. Configuring the Cluster Operator with default proxy settings Copy linkLink copied to clipboard!
If you are running a Kafka cluster behind a HTTP proxy, you can still pass data in and out of the cluster. For example, you can run Kafka Connect with connectors that push and pull data from outside the proxy. Or you can use a proxy to connect with an authorization server.
Configure the Cluster Operator deployment to specify the proxy environment variables. The Cluster Operator accepts standard proxy configuration (HTTP_PROXY
, HTTPS_PROXY
and NO_PROXY
) as environment variables. The proxy settings are applied to all AMQ Streams containers.
The format for a proxy address is http://IP-ADDRESS:PORT-NUMBER. To set up a proxy with a name and password, the format is http://USERNAME:PASSWORD@IP-ADDRESS:PORT-NUMBER.
Prerequisites
This procedure requires use of an OpenShift user account which is able to create CustomResourceDefinitions
, ClusterRoles
and ClusterRoleBindings
. Use of Role Base Access Control (RBAC) in the OpenShift cluster usually means that permission to create, edit, and delete these resources is limited to OpenShift cluster administrators, such as system:admin
.
Procedure
To add proxy environment variables to the Cluster Operator, update its
Deployment
configuration (install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml
).Example proxy configuration for the Cluster Operator
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Alternatively, edit the
Deployment
directly:oc edit deployment strimzi-cluster-operator
oc edit deployment strimzi-cluster-operator
Copy to Clipboard Copied! Toggle word wrap Toggle overflow If you updated the YAML file instead of editing the
Deployment
directly, apply the changes:oc create -f install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml
oc create -f install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
5.2. Using the Topic Operator Copy linkLink copied to clipboard!
When you create, modify or delete a topic using the KafkaTopic
resource, the Topic Operator ensures those changes are reflected in the Kafka cluster.
The Deploying and Upgrading AMQ Streams on OpenShift guide provides instructions to deploy the Topic Operator:
5.2.1. Kafka topic resource Copy linkLink copied to clipboard!
The KafkaTopic
resource is used to configure topics, including the number of partitions and replicas.
The full schema for KafkaTopic
is described in KafkaTopic
schema reference.
5.2.1.1. Identifying a Kafka cluster for topic handling Copy linkLink copied to clipboard!
A KafkaTopic
resource includes a label that defines the appropriate name of the Kafka cluster (derived from the name of the Kafka
resource) to which it belongs.
For example:
The label is used by the Topic Operator to identify the KafkaTopic
resource and create a new topic, and also in subsequent handling of the topic.
If the label does not match the Kafka cluster, the Topic Operator cannot identify the KafkaTopic
and the topic is not created.
5.2.1.2. Kafka topic usage recommendations Copy linkLink copied to clipboard!
When working with topics, be consistent. Always operate on either KafkaTopic
resources or topics directly in OpenShift. Avoid routinely switching between both methods for a given topic.
Use topic names that reflect the nature of the topic, and remember that names cannot be changed later.
If creating a topic in Kafka, use a name that is a valid OpenShift resource name, otherwise the Topic Operator will need to create the corresponding KafkaTopic
with a name that conforms to the OpenShift rules.
Recommendations for identifiers and names in OpenShift are outlined in Identifiers and Names in OpenShift community article.
5.2.1.3. Kafka topic naming conventions Copy linkLink copied to clipboard!
Kafka and OpenShift impose their own validation rules for the naming of topics in Kafka and KafkaTopic.metadata.name
respectively. There are valid names for each which are invalid in the other.
Using the spec.topicName
property, it is possible to create a valid topic in Kafka with a name that would be invalid for the Kafka topic in OpenShift.
The spec.topicName
property inherits Kafka naming validation rules:
- The name must not be longer than 249 characters.
-
Valid characters for Kafka topics are ASCII alphanumerics,
.
,_
, and-
. -
The name cannot be
.
or..
, though.
can be used in a name, such asexampleTopic.
or.exampleTopic
.
spec.topicName
must not be changed.
For example:
- 1
- Upper case is invalid in OpenShift.
cannot be changed to:
Some Kafka client applications, such as Kafka Streams, can create topics in Kafka programmatically. If those topics have names that are invalid OpenShift resource names, the Topic Operator gives them a valid metadata.name
based on the Kafka name. Invalid characters are replaced and a hash is appended to the name. For example:
5.2.2. Topic Operator topic store Copy linkLink copied to clipboard!
The Topic Operator uses Kafka to store topic metadata describing topic configuration as key-value pairs. The topic store is based on the Kafka Streams key-value mechanism, which uses Kafka topics to persist the state.
Topic metadata is cached in-memory and accessed locally within the Topic Operator. Updates from operations applied to the local in-memory cache are persisted to a backup topic store on disk. The topic store is continually synchronized with updates from Kafka topics or OpenShift KafkaTopic
custom resources. Operations are handled rapidly with the topic store set up this way, but should the in-memory cache crash it is automatically repopulated from the persistent storage.
5.2.2.1. Internal topic store topics Copy linkLink copied to clipboard!
Internal topics support the handling of topic metadata in the topic store.
__strimzi_store_topic
- Input topic for storing the topic metadata
__strimzi-topic-operator-kstreams-topic-store-changelog
- Retains a log of compacted topic store values
Do not delete these topics, as they are essential to the running of the Topic Operator.
5.2.2.2. Migrating topic metadata from ZooKeeper Copy linkLink copied to clipboard!
In previous releases of AMQ Streams, topic metadata was stored in ZooKeeper. The new process removes this requirement, bringing the metadata into the Kafka cluster, and under the control of the Topic Operator.
When upgrading to AMQ Streams 1.8, the transition to Topic Operator control of the topic store is seamless. Metadata is found and migrated from ZooKeeper, and the old store is deleted.
5.2.2.3. Downgrading to a AMQ Streams version that uses ZooKeeper to store topic metadata Copy linkLink copied to clipboard!
If you are reverting back to a version of AMQ Streams earlier than 0.22, which uses ZooKeeper for the storage of topic metadata, you still downgrade your Cluster Operator to the previous version, then downgrade Kafka brokers and client applications to the previous Kafka version as standard.
However, you must also delete the topics that were created for the topic store using a kafka-admin
command, specifying the bootstrap address of the Kafka cluster. For example:
oc run kafka-admin -ti --image=registry.redhat.io/amq7/amq-streams-kafka-28-rhel8:1.8.4 --rm=true --restart=Never -- ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic __strimzi-topic-operator-kstreams-topic-store-changelog --delete && ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic __strimzi_store_topic --delete
oc run kafka-admin -ti --image=registry.redhat.io/amq7/amq-streams-kafka-28-rhel8:1.8.4 --rm=true --restart=Never -- ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic __strimzi-topic-operator-kstreams-topic-store-changelog --delete && ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic __strimzi_store_topic --delete
The command must correspond to the type of listener and authentication used to access the Kafka cluster.
The Topic Operator will reconstruct the ZooKeeper topic metadata from the state of the topics in Kafka.
5.2.2.4. Topic Operator topic replication and scaling Copy linkLink copied to clipboard!
The recommended configuration for topics managed by the Topic Operator is a topic replication factor of 3, and a minimum of 2 in-sync replicas.
- 1
- The number of partitions for the topic. Generally, 1 partition is sufficient.
- 2
- The number of replica topic partitions. Currently, this cannot be changed in the
KafkaTopic
resource, but it can be changed using thekafka-reassign-partitions.sh
tool. - 3
- The minimum number of replica partitions that a message must be successfully written to, or an exception is raised.
In-sync replicas are used in conjunction with the acks
configuration for producer applications. The acks
configuration determines the number of follower partitions a message must be replicated to before the message is acknowledged as successfully received. The Topic Operator runs with acks=all
, whereby messages must be acknowledged by all in-sync replicas.
When scaling Kafka clusters by adding or removing brokers, replication factor configuration is not changed and replicas are not reassigned automatically. However, you can use the kafka-reassign-partitions.sh
tool to change the replication factor, and manually reassign replicas to brokers.
Alternatively, though the integration of Cruise Control for AMQ Streams cannot change the replication factor for topics, the optimization proposals it generates for rebalancing Kafka include commands that transfer partition replicas and change partition leadership.
5.2.2.5. Handling changes to topics Copy linkLink copied to clipboard!
A fundamental problem that the Topic Operator needs to solve is that there is no single source of truth: both the KafkaTopic
resource and the Kafka topic can be modified independently of the Topic Operator. Complicating this, the Topic Operator might not always be able to observe changes at each end in real time. For example, when the Topic Operator is down.
To resolve this, the Topic Operator maintains information about each topic in the topic store. When a change happens in the Kafka cluster or OpenShift, it looks at both the state of the other system and the topic store in order to determine what needs to change to keep everything in sync. The same thing happens whenever the Topic Operator starts, and periodically while it is running.
For example, suppose the Topic Operator is not running, and a KafkaTopic
called my-topic is created. When the Topic Operator starts, the topic store does not contain information on my-topic, so it can infer that the KafkaTopic
was created after it was last running. The Topic Operator creates the topic corresponding to my-topic, and also stores metadata for my-topic in the topic store.
If you update Kafka topic configuration or apply a change through the KafkaTopic
custom resource, the topic store is updated after the Kafka cluster is reconciled.
The topic store also allows the Topic Operator to manage scenarios where the topic configuration is changed in Kafka topics and updated through OpenShift KafkaTopic
custom resources, as long as the changes are not incompatible. For example, it is possible to make changes to the same topic config key, but to different values. For incompatible changes, the Kafka configuration takes priority, and the KafkaTopic
is updated accordingly.
You can also use the KafkaTopic
resource to delete topics using a oc delete -f KAFKA-TOPIC-CONFIG-FILE
command. To be able to do this, delete.topic.enable
must be set to true
(default) in the spec.kafka.config
of the Kafka resource.
5.2.3. Configuring a Kafka topic Copy linkLink copied to clipboard!
Use the properties of the KafkaTopic
resource to configure a Kafka topic.
You can use oc apply
to create or modify topics, and oc delete
to delete existing topics.
For example:
-
oc apply -f <topic-config-file>
-
oc delete KafkaTopic <topic-name>
This procedure shows how to create a topic with 10 partitions and 2 replicas.
Before you start
It is important that you consider the following before making your changes:
Kafka does not support making the following changes through the
KafkaTopic
resource:-
Changing topic names using
spec.topicName
-
Decreasing partition size using
spec.partitions
-
Changing topic names using
-
You cannot use
spec.replicas
to change the number of replicas that were initially specified. -
Increasing
spec.partitions
for topics with keys will change how records are partitioned, which can be particularly problematic when the topic uses semantic partitioning.
Prerequisites
- A running Kafka cluster configured with a Kafka broker listener using TLS authentication and encryption.
- A running Topic Operator (typically deployed with the Entity Operator).
-
For deleting a topic,
delete.topic.enable=true
(default) in thespec.kafka.config
of theKafka
resource.
Procedure
Prepare a file containing the
KafkaTopic
to be created.An example
KafkaTopic
Copy to Clipboard Copied! Toggle word wrap Toggle overflow TipWhen modifying a topic, you can get the current version of the resource using
oc get kafkatopic orders -o yaml
.Create the
KafkaTopic
resource in OpenShift.oc apply -f TOPIC-CONFIG-FILE
oc apply -f TOPIC-CONFIG-FILE
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
5.2.4. Configuring the Topic Operator with resource requests and limits Copy linkLink copied to clipboard!
You can allocate resources, such as CPU and memory, to the Topic Operator and set a limit on the amount of resources it can consume.
Prerequisites
- The Cluster Operator is running.
Procedure
Update the Kafka cluster configuration in an editor, as required:
oc edit kafka MY-CLUSTER
oc edit kafka MY-CLUSTER
Copy to Clipboard Copied! Toggle word wrap Toggle overflow In the
spec.entityOperator.topicOperator.resources
property in theKafka
resource, set the resource requests and limits for the Topic Operator.Copy to Clipboard Copied! Toggle word wrap Toggle overflow Apply the new configuration to create or update the resource.
oc apply -f KAFKA-CONFIG-FILE
oc apply -f KAFKA-CONFIG-FILE
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
5.3. Using the User Operator Copy linkLink copied to clipboard!
When you create, modify or delete a user using the KafkaUser
resource, the User Operator ensures those changes are reflected in the Kafka cluster.
The Deploying and Upgrading AMQ Streams on OpenShift guide provides instructions to deploy the User Operator:
For more information about the schema, see KafkaUser
schema reference.
Authenticating and authorizing access to Kafka
Use KafkaUser
to enable the authentication and authorization mechanisms that a specific client uses to access Kafka.
For more information on using KafkUser
to manage users and secure access to Kafka brokers, see Securing access to Kafka brokers.
5.3.1. Configuring the User Operator with resource requests and limits Copy linkLink copied to clipboard!
You can allocate resources, such as CPU and memory, to the User Operator and set a limit on the amount of resources it can consume.
Prerequisites
- The Cluster Operator is running.
Procedure
Update the Kafka cluster configuration in an editor, as required:
oc edit kafka MY-CLUSTER
oc edit kafka MY-CLUSTER
Copy to Clipboard Copied! Toggle word wrap Toggle overflow In the
spec.entityOperator.userOperator.resources
property in theKafka
resource, set the resource requests and limits for the User Operator.Copy to Clipboard Copied! Toggle word wrap Toggle overflow Save the file and exit the editor. The Cluster Operator applies the changes automatically.
5.4. Monitoring operators using Prometheus metrics Copy linkLink copied to clipboard!
AMQ Streams operators expose Prometheus metrics. The metrics are automatically enabled and contain information about:
- Number of reconciliations
- Number of Custom Resources the operator is processing
- Duration of reconciliations
- JVM metrics from the operators
Additionally, we provide an example Grafana dashboard.
For more information about Prometheus, see the Introducing Metrics to Kafka in the Deploying and Upgrading AMQ Streams on OpenShift guide.