Chapter 4. Operators
4.1. Cluster Operator
4.1.1. Overview of the Cluster Operator component
AMQ Streams uses the Cluster Operator to deploy and manage clusters for:
- Kafka (including Zookeeper, Entity Operator and Kafka Exporter)
- Kafka Connect
- Kafka Mirror Maker
- Kafka Bridge
Custom resources are used to deploy the clusters.
For example, to deploy a Kafka cluster:
-
A
Kafka
resource with the cluster configuration is created within the OpenShift cluster. -
The Cluster Operator deploys a corresponding Kafka cluster, based on what is declared in the
Kafka
resource.
The Cluster Operator can also deploy (through Entity Operator configuration of the Kafka
resource):
-
A Topic Operator to provide operator-style topic management through
KafkaTopic
custom resources -
A User Operator to provide operator-style user management through
KafkaUser
custom resources
For more information on the configuration options supported by the Kafka
resource, see Section 3.1, “Kafka cluster configuration”.
On OpenShift, a Kafka Connect deployment can incorporate a Source2Image feature to provides a convenient way to include connectors.
Example architecture for the Cluster Operator
4.1.2. Watch options for a Cluster Operator deployment
When the Cluster Operator is running, it starts to watch for updates of Kafka resources.
Depending on the deployment, the Cluster Operator can watch Kafka resources from:
AMQ Streams provides example YAML files to make the deployment process easier.
The Cluster Operator watches the following resources:
-
Kafka
for the Kafka cluster. -
KafkaConnect
for the Kafka Connect cluster. -
KafkaConnectS2I
for the Kafka Connect cluster with Source2Image support. -
KafkaMirrorMaker
for the Kafka Mirror Maker instance. -
KafkaBridge
for the Kafka Bridge instance
When one of these resources is created in the OpenShift cluster, the operator gets the cluster description from the resource and starts creating a new cluster for the resource by creating the necessary OpenShift resources, such as StatefulSets, Services and ConfigMaps.
Each time a Kafka resource is updated, the operator performs corresponding updates on the OpenShift resources that make up the cluster for the resource.
Resources are either patched or deleted, and then recreated in order to make the cluster for the resource reflect the desired state of the cluster. This operation might cause a rolling update that might lead to service disruption.
When a resource is deleted, the operator undeploys the cluster and deletes all related OpenShift resources.
4.1.3. Deploying the Cluster Operator to watch a single namespace
Prerequisites
-
This procedure requires use of an OpenShift user account which is able to create
CustomResourceDefinitions
,ClusterRoles
andClusterRoleBindings
. Use of Role Base Access Control (RBAC) in the OpenShift cluster usually means that permission to create, edit, and delete these resources is limited to OpenShift cluster administrators, such assystem:admin
. Modify the installation files according to the namespace the Cluster Operator is going to be installed in.
On Linux, use:
sed -i 's/namespace: .*/namespace: my-namespace/' install/cluster-operator/*RoleBinding*.yaml
On MacOS, use:
sed -i '' 's/namespace: .*/namespace: my-namespace/' install/cluster-operator/*RoleBinding*.yaml
Procedure
Deploy the Cluster Operator:
oc apply -f install/cluster-operator -n my-namespace
4.1.4. Deploying the Cluster Operator to watch multiple namespaces
Prerequisites
-
This procedure requires use of an OpenShift user account which is able to create
CustomResourceDefinitions
,ClusterRoles
andClusterRoleBindings
. Use of Role Base Access Control (RBAC) in the OpenShift cluster usually means that permission to create, edit, and delete these resources is limited to OpenShift cluster administrators, such assystem:admin
. Edit the installation files according to the namespace the Cluster Operator is going to be installed in.
On Linux, use:
sed -i 's/namespace: .*/namespace: my-namespace/' install/cluster-operator/*RoleBinding*.yaml
On MacOS, use:
sed -i '' 's/namespace: .*/namespace: my-namespace/' install/cluster-operator/*RoleBinding*.yaml
Procedure
Edit the file
install/cluster-operator/050-Deployment-strimzi-cluster-operator.yaml
and in the environment variableSTRIMZI_NAMESPACE
list all the namespaces where Cluster Operator should watch for resources. For example:apiVersion: apps/v1 kind: Deployment spec: # ... template: spec: serviceAccountName: strimzi-cluster-operator containers: - name: strimzi-cluster-operator image: registry.redhat.io/amq7/amq-streams-operator:1.3.0 imagePullPolicy: IfNotPresent env: - name: STRIMZI_NAMESPACE value: watched-namespace-1,watched-namespace-2,watched-namespace-3
For all namespaces which should be watched by the Cluster Operator (
watched-namespace-1
,watched-namespace-2
,watched-namespace-3
in the above example), install theRoleBindings
. Replace thewatched-namespace
with the namespace used in the previous step.This can be done using
oc apply
:oc apply -f install/cluster-operator/020-RoleBinding-strimzi-cluster-operator.yaml -n watched-namespace oc apply -f install/cluster-operator/031-RoleBinding-strimzi-cluster-operator-entity-operator-delegation.yaml -n watched-namespace oc apply -f install/cluster-operator/032-RoleBinding-strimzi-cluster-operator-topic-operator-delegation.yaml -n watched-namespace
Deploy the Cluster Operator
This can be done using
oc apply
:oc apply -f install/cluster-operator -n my-namespace
4.1.5. Deploying the Cluster Operator to watch all namespaces
You can configure the Cluster Operator to watch AMQ Streams resources across all namespaces in your OpenShift cluster. When running in this mode, the Cluster Operator automatically manages clusters in any new namespaces that are created.
Prerequisites
-
This procedure requires use of an OpenShift user account which is able to create
CustomResourceDefinitions
,ClusterRoles
andClusterRoleBindings
. Use of Role Base Access Control (RBAC) in the OpenShift cluster usually means that permission to create, edit, and delete these resources is limited to OpenShift cluster administrators, such assystem:admin
. - Your OpenShift cluster is running.
Procedure
Configure the Cluster Operator to watch all namespaces:
-
Edit the
050-Deployment-strimzi-cluster-operator.yaml
file. Set the value of the
STRIMZI_NAMESPACE
environment variable to*
.apiVersion: apps/v1 kind: Deployment spec: # ... template: spec: # ... serviceAccountName: strimzi-cluster-operator containers: - name: strimzi-cluster-operator image: registry.redhat.io/amq7/amq-streams-operator:1.3.0 imagePullPolicy: IfNotPresent env: - name: STRIMZI_NAMESPACE value: "*" # ...
-
Edit the
Create
ClusterRoleBindings
that grant cluster-wide access to all namespaces to the Cluster Operator.Use the
oc create clusterrolebinding
command:oc create clusterrolebinding strimzi-cluster-operator-namespaced --clusterrole=strimzi-cluster-operator-namespaced --serviceaccount my-namespace:strimzi-cluster-operator oc create clusterrolebinding strimzi-cluster-operator-entity-operator-delegation --clusterrole=strimzi-entity-operator --serviceaccount my-namespace:strimzi-cluster-operator oc create clusterrolebinding strimzi-cluster-operator-topic-operator-delegation --clusterrole=strimzi-topic-operator --serviceaccount my-namespace:strimzi-cluster-operator
Replace
my-namespace
with the namespace in which you want to install the Cluster Operator.Deploy the Cluster Operator to your OpenShift cluster.
Use the
oc apply
command:oc apply -f install/cluster-operator -n my-namespace
4.1.6. Reconciliation
Although the operator reacts to all notifications about the desired cluster resources received from the OpenShift cluster, if the operator is not running, or if a notification is not received for any reason, the desired resources will get out of sync with the state of the running OpenShift cluster.
In order to handle failovers properly, a periodic reconciliation process is executed by the Cluster Operator so that it can compare the state of the desired resources with the current cluster deployments in order to have a consistent state across all of them. You can set the time interval for the periodic reconciliations using the [STRIMZI_FULL_RECONCILIATION_INTERVAL_MS] variable.
4.1.7. Cluster Operator Configuration
The Cluster Operator can be configured through the following supported environment variables:
STRIMZI_NAMESPACE
A comma-separated list of namespaces that the operator should operate in. When not set, set to empty string, or to
*
the Cluster Operator will operate in all namespaces. The Cluster Operator deployment might use the OpenShift Downward API to set this automatically to the namespace the Cluster Operator is deployed in. See the example below:env: - name: STRIMZI_NAMESPACE valueFrom: fieldRef: fieldPath: metadata.namespace
-
STRIMZI_FULL_RECONCILIATION_INTERVAL_MS
- Optional, default is 120000 ms. The interval between periodic reconciliations, in milliseconds.
STRIMZI_LOG_LEVEL
-
Optional, default
INFO
. The level for printing logging messages. The value can be set to:ERROR
,WARNING
,INFO
,DEBUG
, andTRACE
. STRIMZI_OPERATION_TIMEOUT_MS
- Optional, default 300000 ms. The timeout for internal operations, in milliseconds. This value should be increased when using AMQ Streams on clusters where regular OpenShift operations take longer than usual (because of slow downloading of Docker images, for example).
STRIMZI_KAFKA_IMAGES
-
Required. This provides a mapping from Kafka version to the corresponding Docker image containing a Kafka broker of that version. The required syntax is whitespace or comma separated
<version>=<image>
pairs. For example2.2.1=registry.redhat.io/amq7/amq-streams-kafka-22:1.3.0, 2.3.0=registry.redhat.io/amq7/amq-streams-kafka-23:1.3.0
. This is used when aKafka.spec.kafka.version
property is specified but not theKafka.spec.kafka.image
, as described in Section 3.1.18, “Container images”. STRIMZI_DEFAULT_KAFKA_INIT_IMAGE
-
Optional, default
registry.redhat.io/amq7/amq-streams-operator:1.3.0
. The image name to use as default for the init container started before the broker for initial configuration work (that is, rack support), if no image is specified as thekafka-init-image
in the Section 3.1.18, “Container images”. STRIMZI_DEFAULT_TLS_SIDECAR_KAFKA_IMAGE
-
Optional, default
registry.redhat.io/amq7/amq-streams-kafka-23:1.3.0
. The image name to use as the default when deploying the sidecar container which provides TLS support for Kafka, if no image is specified as theKafka.spec.kafka.tlsSidecar.image
in the Section 3.1.18, “Container images”. STRIMZI_DEFAULT_ZOOKEEPER_IMAGE
-
Optional, default
registry.redhat.io/amq7/amq-streams-kafka-23:1.3.0
. The image name to use as the default when deploying Zookeeper, if no image is specified as theKafka.spec.zookeeper.image
in the Section 3.1.18, “Container images”. STRIMZI_DEFAULT_TLS_SIDECAR_ZOOKEEPER_IMAGE
-
Optional, default
registry.redhat.io/amq7/amq-streams-kafka-23:1.3.0
. The image name to use as the default when deploying the sidecar container which provides TLS support for Zookeeper, if no image is specified as theKafka.spec.zookeeper.tlsSidecar.image
in the Section 3.1.18, “Container images”. STRIMZI_KAFKA_CONNECT_IMAGES
-
Required. This provides a mapping from the Kafka version to the corresponding Docker image containing a Kafka connect of that version. The required syntax is whitespace or comma separated
<version>=<image>
pairs. For example2.2.1=registry.redhat.io/amq7/amq-streams-kafka-22:1.3.0, 2.3.0=registry.redhat.io/amq7/amq-streams-kafka-23:1.3.0
. This is used when aKafkaConnect.spec.version
property is specified but not theKafkaConnect.spec.image
, as described in Section 3.2.11, “Container images”. STRIMZI_KAFKA_CONNECT_S2I_IMAGES
-
Required. This provides a mapping from the Kafka version to the corresponding Docker image containing a Kafka connect of that version. The required syntax is whitespace or comma separated
<version>=<image>
pairs. For example2.2.1=registry.redhat.io/amq7/amq-streams-kafka-22:1.3.0, 2.3.0=registry.redhat.io/amq7/amq-streams-kafka-23:1.3.0
. This is used when aKafkaConnectS2I.spec.version
property is specified but not theKafkaConnectS2I.spec.image
, as described in Section 3.3.11, “Container images”. STRIMZI_KAFKA_MIRROR_MAKER_IMAGES
-
Required. This provides a mapping from the Kafka version to the corresponding Docker image containing a Kafka mirror maker of that version. The required syntax is whitespace or comma separated
<version>=<image>
pairs. For example2.2.1=registry.redhat.io/amq7/amq-streams-kafka-22:1.3.0, 2.3.0=registry.redhat.io/amq7/amq-streams-kafka-23:1.3.0
. This is used when aKafkaMirrorMaker.spec.version
property is specified but not theKafkaMirrorMaker.spec.image
, as described in Section 3.4.2.14, “Container images”. STRIMZI_DEFAULT_TOPIC_OPERATOR_IMAGE
-
Optional, default
registry.redhat.io/amq7/amq-streams-operator:1.3.0
. The image name to use as the default when deploying the topic operator, if no image is specified as theKafka.spec.entityOperator.topicOperator.image
in the Section 3.1.18, “Container images” of theKafka
resource. STRIMZI_DEFAULT_USER_OPERATOR_IMAGE
-
Optional, default
registry.redhat.io/amq7/amq-streams-operator:1.3.0
. The image name to use as the default when deploying the user operator, if no image is specified as theKafka.spec.entityOperator.userOperator.image
in the Section 3.1.18, “Container images” of theKafka
resource. STRIMZI_DEFAULT_TLS_SIDECAR_ENTITY_OPERATOR_IMAGE
-
Optional, default
registry.redhat.io/amq7/amq-streams-kafka-23:1.3.0
. The image name to use as the default when deploying the sidecar container which provides TLS support for the Entity Operator, if no image is specified as theKafka.spec.entityOperator.tlsSidecar.image
in the Section 3.1.18, “Container images”. STRIMZI_IMAGE_PULL_POLICY
-
Optional. The
ImagePullPolicy
which will be applied to containers in all pods managed by AMQ Streams Cluster Operator. The valid values areAlways
,IfNotPresent
, andNever
. If not specified, the OpenShift defaults will be used. Changing the policy will result in a rolling update of all your Kafka, Kafka Connect, and Kafka Mirror Maker clusters. STRIMZI_IMAGE_PULL_SECRETS
-
Optional. A comma-separated list of
Secret
names. The secrets referenced here contain the credentials to the container registries where the container images are pulled from. The secrets are used in theimagePullSecrets
field for allPods
created by the Cluster Operator. Changing this list results in a rolling update of all your Kafka, Kafka Connect, and Kafka Mirror Maker clusters.
4.1.8. Role-Based Access Control (RBAC)
4.1.8.1. Provisioning Role-Based Access Control (RBAC) for the Cluster Operator
For the Cluster Operator to function it needs permission within the OpenShift cluster to interact with resources such as Kafka
, KafkaConnect
, and so on, as well as the managed resources, such as ConfigMaps
, Pods
, Deployments
, StatefulSets
, Services
, and so on. Such permission is described in terms of OpenShift role-based access control (RBAC) resources:
-
ServiceAccount
, -
Role
andClusterRole
, -
RoleBinding
andClusterRoleBinding
.
In addition to running under its own ServiceAccount
with a ClusterRoleBinding
, the Cluster Operator manages some RBAC resources for the components that need access to OpenShift resources.
OpenShift also includes privilege escalation protections that prevent components operating under one ServiceAccount
from granting other ServiceAccounts
privileges that the granting ServiceAccount
does not have. Because the Cluster Operator must be able to create the ClusterRoleBindings
, and RoleBindings
needed by resources it manages, the Cluster Operator must also have those same privileges.
4.1.8.2. Delegated privileges
When the Cluster Operator deploys resources for a desired Kafka
resource it also creates ServiceAccounts
, RoleBindings
, and ClusterRoleBindings
, as follows:
The Kafka broker pods use a
ServiceAccount
calledcluster-name-kafka
-
When the rack feature is used, the
strimzi-cluster-name-kafka-init
ClusterRoleBinding
is used to grant thisServiceAccount
access to the nodes within the cluster via aClusterRole
calledstrimzi-kafka-broker
- When the rack feature is not used no binding is created.
-
When the rack feature is used, the
-
The Zookeeper pods use the default
ServiceAccount
, as they do not need access to the OpenShift resources. The Topic Operator pod uses a
ServiceAccount
calledcluster-name-topic-operator
-
The Topic Operator produces OpenShift events with status information, so the
ServiceAccount
is bound to aClusterRole
calledstrimzi-topic-operator
which grants this access via thestrimzi-topic-operator-role-binding
RoleBinding
.
-
The Topic Operator produces OpenShift events with status information, so the
The pods for KafkaConnect
and KafkaConnectS2I
resources use the default ServiceAccount
, as they do not require access to the OpenShift resources.
4.1.8.3. ServiceAccount
The Cluster Operator is best run using a ServiceAccount
:
Example ServiceAccount
for the Cluster Operator
apiVersion: v1 kind: ServiceAccount metadata: name: strimzi-cluster-operator labels: app: strimzi
The Deployment
of the operator then needs to specify this in its spec.template.spec.serviceAccountName
:
Partial example of Deployment
for the Cluster Operator
apiVersion: apps/v1 kind: Deployment metadata: name: strimzi-cluster-operator labels: app: strimzi spec: replicas: 1 selector: matchLabels: name: strimzi-cluster-operator strimzi.io/kind: cluster-operator template: # ...
Note line 12, where the the strimzi-cluster-operator
ServiceAccount
is specified as the serviceAccountName
.
4.1.8.4. ClusterRoles
The Cluster Operator needs to operate using ClusterRoles
that gives access to the necessary resources. Depending on the OpenShift cluster setup, a cluster administrator might be needed to create the ClusterRoles
.
Cluster administrator rights are only needed for the creation of the ClusterRoles
. The Cluster Operator will not run under the cluster admin account.
The ClusterRoles
follow the principle of least privilege and contain only those privileges needed by the Cluster Operator to operate Kafka, Kafka Connect, and Zookeeper clusters. The first set of assigned privileges allow the Cluster Operator to manage OpenShift resources such as StatefulSets
, Deployments
, Pods
, and ConfigMaps
.
Cluster Operator uses ClusterRoles to grant permission at the namespace-scoped resources level and cluster-scoped resources level:
ClusterRole
with namespaced resources for the Cluster Operator
apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: name: strimzi-cluster-operator-namespaced labels: app: strimzi rules: - apiGroups: - "" resources: - serviceaccounts verbs: - get - create - delete - patch - update - apiGroups: - rbac.authorization.k8s.io resources: - rolebindings verbs: - get - create - delete - patch - update - apiGroups: - "" resources: - configmaps verbs: - get - list - watch - create - delete - patch - update - apiGroups: - kafka.strimzi.io resources: - kafkas - kafkas/status - kafkaconnects - kafkaconnects/status - kafkaconnects2is - kafkaconnects2is/status - kafkamirrormakers - kafkamirrormakers/status - kafkabridges - kafkabridges/status verbs: - get - list - watch - create - delete - patch - update - apiGroups: - "" resources: - pods verbs: - get - list - watch - delete - apiGroups: - "" resources: - services verbs: - get - list - watch - create - delete - patch - update - apiGroups: - "" resources: - endpoints verbs: - get - list - watch - apiGroups: - extensions resources: - deployments - deployments/scale - replicasets verbs: - get - list - watch - create - delete - patch - update - apiGroups: - apps resources: - deployments - deployments/scale - deployments/status - statefulsets - replicasets verbs: - get - list - watch - create - delete - patch - update - apiGroups: - "" resources: - events verbs: - create - apiGroups: - extensions resources: - replicationcontrollers verbs: - get - list - watch - create - delete - patch - update - apiGroups: - apps.openshift.io resources: - deploymentconfigs - deploymentconfigs/scale - deploymentconfigs/status - deploymentconfigs/finalizers verbs: - get - list - watch - create - delete - patch - update - apiGroups: - build.openshift.io resources: - buildconfigs - builds verbs: - create - delete - get - list - patch - watch - update - apiGroups: - image.openshift.io resources: - imagestreams - imagestreams/status verbs: - create - delete - get - list - watch - patch - update - apiGroups: - "" resources: - replicationcontrollers verbs: - get - list - watch - create - delete - patch - update - apiGroups: - "" resources: - secrets verbs: - get - list - create - delete - patch - update - apiGroups: - extensions resources: - networkpolicies verbs: - get - list - watch - create - delete - patch - update - apiGroups: - networking.k8s.io resources: - networkpolicies verbs: - get - list - watch - create - delete - patch - update - apiGroups: - route.openshift.io resources: - routes - routes/custom-host verbs: - get - list - create - delete - patch - update - apiGroups: - "" resources: - persistentvolumeclaims verbs: - get - list - create - delete - patch - update - apiGroups: - policy resources: - poddisruptionbudgets verbs: - get - list - watch - create - delete - patch - update - apiGroups: - extensions resources: - ingresses verbs: - get - list - watch - create - delete - patch - update
The second includes the permissions needed for cluster-scoped resources.
ClusterRole
with cluster-scoped resources for the Cluster Operator
apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: name: strimzi-cluster-operator-global labels: app: strimzi rules: - apiGroups: - rbac.authorization.k8s.io resources: - clusterrolebindings verbs: - get - create - delete - patch - update - watch - apiGroups: - storage.k8s.io resources: - storageclasses verbs: - get
The strimzi-kafka-broker
ClusterRole
represents the access needed by the init container in Kafka pods that is used for the rack feature. As described in the Delegated privileges section, this role is also needed by the Cluster Operator in order to be able to delegate this access.
ClusterRole
for the Cluster Operator allowing it to delegate access to OpenShift nodes to the Kafka broker pods
apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: name: strimzi-kafka-broker labels: app: strimzi rules: - apiGroups: - "" resources: - nodes verbs: - get
The strimzi-topic-operator
ClusterRole
represents the access needed by the Topic Operator. As described in the Delegated privileges section, this role is also needed by the Cluster Operator in order to be able to delegate this access.
ClusterRole
for the Cluster Operator allowing it to delegate access to events to the Topic Operator
apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: name: strimzi-entity-operator labels: app: strimzi rules: - apiGroups: - kafka.strimzi.io resources: - kafkatopics - kafkatopics/status verbs: - get - list - watch - create - patch - update - delete - apiGroups: - "" resources: - events verbs: - create - apiGroups: - kafka.strimzi.io resources: - kafkausers - kafkausers/status verbs: - get - list - watch - create - patch - update - delete - apiGroups: - "" resources: - secrets verbs: - get - list - create - patch - update - delete
4.1.8.5. ClusterRoleBindings
The operator needs ClusterRoleBindings
and RoleBindings
which associates its ClusterRole
with its ServiceAccount
: ClusterRoleBindings
are needed for ClusterRoles
containing cluster-scoped resources.
Example ClusterRoleBinding
for the Cluster Operator
apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding metadata: name: strimzi-cluster-operator labels: app: strimzi subjects: - kind: ServiceAccount name: strimzi-cluster-operator namespace: myproject roleRef: kind: ClusterRole name: strimzi-cluster-operator-global apiGroup: rbac.authorization.k8s.io
ClusterRoleBindings
are also needed for the ClusterRoles
needed for delegation:
Examples RoleBinding
for the Cluster Operator
apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding metadata: name: strimzi-cluster-operator-kafka-broker-delegation labels: app: strimzi subjects: - kind: ServiceAccount name: strimzi-cluster-operator namespace: myproject roleRef: kind: ClusterRole name: strimzi-kafka-broker apiGroup: rbac.authorization.k8s.io
ClusterRoles
containing only namespaced resources are bound using RoleBindings
only.
apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: name: strimzi-cluster-operator labels: app: strimzi subjects: - kind: ServiceAccount name: strimzi-cluster-operator namespace: myproject roleRef: kind: ClusterRole name: strimzi-cluster-operator-namespaced apiGroup: rbac.authorization.k8s.io
apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: name: strimzi-cluster-operator-entity-operator-delegation labels: app: strimzi subjects: - kind: ServiceAccount name: strimzi-cluster-operator namespace: myproject roleRef: kind: ClusterRole name: strimzi-entity-operator apiGroup: rbac.authorization.k8s.io
4.2. Topic Operator
4.2.1. Overview of the Topic Operator component
The Topic Operator provides a way of managing topics in a Kafka cluster via OpenShift resources.
Example architecture for the Topic Operator
The role of the Topic Operator is to keep a set of KafkaTopic
OpenShift resources describing Kafka topics in-sync with corresponding Kafka topics.
Specifically, if a KafkaTopic
is:
- Created, the operator will create the topic it describes
- Deleted, the operator will delete the topic it describes
- Changed, the operator will update the topic it describes
And also, in the other direction, if a topic is:
-
Created within the Kafka cluster, the operator will create a
KafkaTopic
describing it -
Deleted from the Kafka cluster, the operator will delete the
KafkaTopic
describing it -
Changed in the Kafka cluster, the operator will update the
KafkaTopic
describing it
This allows you to declare a KafkaTopic
as part of your application’s deployment and the Topic Operator will take care of creating the topic for you. Your application just needs to deal with producing or consuming from the necessary topics.
If the topic is reconfigured or reassigned to different Kafka nodes, the KafkaTopic
will always be up to date.
For more details about creating, modifying and deleting topics, see Chapter 5, Using the Topic Operator.
4.2.2. Identifying a Kafka cluster for topic handling
A KafkaTopic
resource includes a label that defines the appropriate name of the Kafka cluster (derived from the name of the Kafka
resource) to which it belongs.
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaTopic metadata: name: my-topic labels: strimzi.io/cluster: my-cluster
The label is used by the Topic Operator to identify the KafkaTopic
resource and create a new topic, and also in subsequent handling of the topic.
If the label does not match the Kafka cluster, the Topic Operator cannot identify the KafkaTopic
and the topic is not created.
4.2.3. Understanding the Topic Operator
A fundamental problem that the operator has to solve is that there is no single source of truth: Both the KafkaTopic
resource and the topic within Kafka can be modified independently of the operator. Complicating this, the Topic Operator might not always be able to observe changes at each end in real time (for example, the operator might be down).
To resolve this, the operator maintains its own private copy of the information about each topic. When a change happens either in the Kafka cluster, or in OpenShift, it looks at both the state of the other system and at its private copy in order to determine what needs to change to keep everything in sync. The same thing happens whenever the operator starts, and periodically while it is running.
For example, suppose the Topic Operator is not running, and a KafkaTopic
my-topic
gets created. When the operator starts it will lack a private copy of "my-topic", so it can infer that the KafkaTopic
has been created since it was last running. The operator will create the topic corresponding to "my-topic" and also store a private copy of the metadata for "my-topic".
The private copy allows the operator to cope with scenarios where the topic configuration gets changed both in Kafka and in OpenShift, so long as the changes are not incompatible (for example, both changing the same topic config key, but to different values). In the case of incompatible changes, the Kafka configuration wins, and the KafkaTopic
will be updated to reflect that.
The private copy is held in the same Zookeeper ensemble used by Kafka itself. This mitigates availability concerns, because if Zookeeper is not running then Kafka itself cannot run, so the operator will be no less available than it would even if it was stateless.
4.2.4. Deploying the Topic Operator using the Cluster Operator
This procedure describes how to deploy the Topic Operator using the Cluster Operator. If you want to use the Topic Operator with a Kafka cluster that is not managed by AMQ Streams, you must deploy the Topic Operator as a standalone component. For more information, see Section 4.2.6, “Deploying the standalone Topic Operator”.
Prerequisites
- A running Cluster Operator
-
A
Kafka
resource to be created or updated
Procedure
Ensure that the
Kafka.spec.entityOperator
object exists in theKafka
resource. This configures the Entity Operator.apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: #... entityOperator: topicOperator: {} userOperator: {}
-
Configure the Topic Operator using the fields described in Section C.52, “
EntityTopicOperatorSpec
schema reference”. Create or update the Kafka resource in OpenShift.
Use
oc apply
:oc apply -f your-file
Additional resources
- For more information about deploying the Cluster Operator, see Section 2.3, “Cluster Operator”.
- For more information about deploying the Entity Operator, see Section 3.1.11, “Entity Operator”.
-
For more information about the
Kafka.spec.entityOperator
object used to configure the Topic Operator when deployed by the Cluster Operator, see Section C.51, “EntityOperatorSpec
schema reference”.
4.2.5. Configuring the Topic Operator with resource requests and limits
You can allocate resources, such as CPU and memory, to the Topic Operator and set a limit on the amount of resources it can consume.
Prerequisites
- The Cluster Operator is running.
Procedure
Update the Kafka cluster configuration in an editor, as required:
Use
oc edit
:oc edit kafka my-cluster
In the
spec.entityOperator.topicOperator.resources
property in theKafka
resource, set the resource requests and limits for the Topic Operator.apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: # kafka and zookeeper sections... entityOperator: topicOperator: resources: request: cpu: "1" memory: 500Mi limit: cpu: "1" memory: 500Mi
Apply the new configuration to create or update the resource.
Use
oc apply
:oc apply -f kafka.yaml
Additional resources
-
For more information about the schema of the
resources
object, see Section C.36, “ResourceRequirements
schema reference”.
4.2.6. Deploying the standalone Topic Operator
Deploying the Topic Operator as a standalone component is more complicated than installing it using the Cluster Operator, but it is more flexible. For instance, it can operate with any Kafka cluster, not necessarily one deployed by the Cluster Operator.
Prerequisites
- An existing Kafka cluster for the Topic Operator to connect to.
Procedure
Edit the
install/topic-operator/05-Deployment-strimzi-topic-operator.yaml
resource. You will need to change the following-
The
STRIMZI_KAFKA_BOOTSTRAP_SERVERS
environment variable inDeployment.spec.template.spec.containers[0].env
should be set to a list of bootstrap brokers in your Kafka cluster, given as a comma-separated list ofhostname:port
pairs. -
The
STRIMZI_ZOOKEEPER_CONNECT
environment variable inDeployment.spec.template.spec.containers[0].env
should be set to a list of the Zookeeper nodes, given as a comma-separated list ofhostname:port
pairs. This should be the same Zookeeper cluster that your Kafka cluster is using. -
The
STRIMZI_NAMESPACE
environment variable inDeployment.spec.template.spec.containers[0].env
should be set to the OpenShift namespace in which you want the operator to watch forKafkaTopic
resources.
-
The
Deploy the Topic Operator.
This can be done using
oc apply
:oc apply -f install/topic-operator
Verify that the Topic Operator has been deployed successfully. This can be done using
oc describe
:oc describe deployment strimzi-topic-operator
The Topic Operator is deployed once the
Replicas:
entry shows1 available
.NoteThis could take some time if you have a slow connection to the OpenShift and the images have not been downloaded before.
Additional resources
- For more information about the environment variables used to configure the Topic Operator, see Section 4.2.7, “Topic Operator environment”.
- For more information about getting the Cluster Operator to deploy the Topic Operator for you, see Section 2.9.2, “Deploying the Topic Operator using the Cluster Operator”.
4.2.7. Topic Operator environment
When deployed standalone the Topic Operator can be configured using environment variables.
The Topic Operator should be configured using the Kafka.spec.entityOperator.topicOperator
property when deployed by the Cluster Operator.
STRIMZI_RESOURCE_LABELS
-
The label selector used to identify
KafkaTopics
to be managed by the operator. STRIMZI_ZOOKEEPER_SESSION_TIMEOUT_MS
-
The Zookeeper session timeout, in milliseconds. For example,
10000
. Default20000
(20 seconds). STRIMZI_KAFKA_BOOTSTRAP_SERVERS
- The list of Kafka bootstrap servers. This variable is mandatory.
STRIMZI_ZOOKEEPER_CONNECT
- The Zookeeper connection information. This variable is mandatory.
STRIMZI_FULL_RECONCILIATION_INTERVAL_MS
- The interval between periodic reconciliations, in milliseconds.
STRIMZI_TOPIC_METADATA_MAX_ATTEMPTS
-
The number of attempts at getting topic metadata from Kafka. The time between each attempt is defined as an exponential back-off. Consider increasing this value when topic creation could take more time due to the number of partitions or replicas. Default
6
. STRIMZI_LOG_LEVEL
-
The level for printing logging messages. The value can be set to:
ERROR
,WARNING
,INFO
,DEBUG
, andTRACE
. DefaultINFO
. STRIMZI_TLS_ENABLED
-
For enabling the TLS support so encrypting the communication with Kafka brokers. Default
true
. STRIMZI_TRUSTSTORE_LOCATION
-
The path to the truststore containing certificates for enabling TLS based communication. This variable is mandatory only if TLS is enabled through
STRIMZI_TLS_ENABLED
. STRIMZI_TRUSTSTORE_PASSWORD
-
The password for accessing the truststore defined by
STRIMZI_TRUSTSTORE_LOCATION
. This variable is mandatory only if TLS is enabled throughSTRIMZI_TLS_ENABLED
. STRIMZI_KEYSTORE_LOCATION
-
The path to the keystore containing private keys for enabling TLS based communication. This variable is mandatory only if TLS is enabled through
STRIMZI_TLS_ENABLED
. STRIMZI_KEYSTORE_PASSWORD
-
The password for accessing the keystore defined by
STRIMZI_KEYSTORE_LOCATION
. This variable is mandatory only if TLS is enabled throughSTRIMZI_TLS_ENABLED
.
4.3. User Operator
The User Operator manages Kafka users through custom resources.
4.3.1. Overview of the User Operator component
The User Operator manages Kafka users for a Kafka cluster by watching for KafkaUser
resources that describe Kafka users and ensuring that they are configured properly in the Kafka cluster. For example:
-
if a
KafkaUser
is created, the User Operator will create the user it describes -
if a
KafkaUser
is deleted, the User Operator will delete the user it describes -
if a
KafkaUser
is changed, the User Operator will update the user it describes
Unlike the Topic Operator, the User Operator does not sync any changes from the Kafka cluster with the OpenShift resources. Unlike the Kafka topics which might be created by applications directly in Kafka, it is not expected that the users will be managed directly in the Kafka cluster in parallel with the User Operator, so this should not be needed.
The User Operator allows you to declare a KafkaUser
as part of your application’s deployment. When the user is created, the user credentials will be created in a Secret
. Your application needs to use the user and its credentials for authentication and to produce or consume messages.
In addition to managing credentials for authentication, the User Operator also manages authorization rules by including a description of the user’s rights in the KafkaUser
declaration.
4.3.2. Identifying a Kafka cluster for user handling
A KafkaUser
resource includes a label that defines the appropriate name of the Kafka cluster (derived from the name of the Kafka
resource) to which it belongs.
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaUser metadata: name: my-user labels: strimzi.io/cluster: my-cluster
The label is used by the User Operator to identify the KafkaUser
resource and create a new user, and also in subsequent handling of the user.
If the label does not match the Kafka cluster, the User Operator cannot identify the kafkaUser
and the user is not created.
4.3.3. Deploying the User Operator using the Cluster Operator
Prerequisites
- A running Cluster Operator
-
A
Kafka
resource to be created or updated.
Procedure
-
Edit the
Kafka
resource ensuring it has aKafka.spec.entityOperator.userOperator
object that configures the User Operator how you want. Create or update the Kafka resource in OpenShift.
This can be done using
oc apply
:oc apply -f your-file
Additional resources
- For more information about deploying the Cluster Operator, see Section 2.3, “Cluster Operator”.
-
For more information about the
Kafka.spec.entityOperator
object used to configure the User Operator when deployed by the Cluster Operator, seeEntityOperatorSpec
schema reference.
4.3.4. Configuring the User Operator with resource requests and limits
You can allocate resources, such as CPU and memory, to the User Operator and set a limit on the amount of resources it can consume.
Prerequisites
- The Cluster Operator is running.
Procedure
Update the Kafka cluster configuration in an editor, as required:
oc edit kafka my-cluster
In the
spec.entityOperator.userOperator.resources
property in theKafka
resource, set the resource requests and limits for the User Operator.apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: # kafka and zookeeper sections... entityOperator: userOperator: resources: request: cpu: "1" memory: 500Mi limit: cpu: "1" memory: 500Mi
Save the file and exit the editor. The Cluster Operator will apply the changes automatically.
Additional resources
-
For more information about the schema of the
resources
object, see Section C.36, “ResourceRequirements
schema reference”.
4.3.5. Deploying the standalone User Operator
Deploying the User Operator as a standalone component is more complicated than installing it using the Cluster Operator, but it is more flexible. For instance, it can operate with any Kafka cluster, not only the one deployed by the Cluster Operator.
Prerequisites
- An existing Kafka cluster for the User Operator to connect to.
Procedure
Edit the
install/user-operator/05-Deployment-strimzi-user-operator.yaml
resource. You will need to change the following-
The
STRIMZI_CA_CERT_NAME
environment variable inDeployment.spec.template.spec.containers[0].env
should be set to point to an OpenShiftSecret
which should contain the public key of the Certificate Authority for signing new user certificates for TLS Client Authentication. TheSecret
should contain the public key of the Certificate Authority under the keyca.crt
. -
The
STRIMZI_CA_KEY_NAME
environment variable inDeployment.spec.template.spec.containers[0].env
should be set to point to an OpenShiftSecret
which should contain the private key of the Certificate Authority for signing new user certificates for TLS Client Authentication. TheSecret
should contain the private key of the Certificate Authority under the keyca.key
. -
The
STRIMZI_ZOOKEEPER_CONNECT
environment variable inDeployment.spec.template.spec.containers[0].env
should be set to a list of the Zookeeper nodes, given as a comma-separated list ofhostname:port
pairs. This should be the same Zookeeper cluster that your Kafka cluster is using. -
The
STRIMZI_NAMESPACE
environment variable inDeployment.spec.template.spec.containers[0].env
should be set to the OpenShift namespace in which you want the operator to watch forKafkaUser
resources.
-
The
Deploy the User Operator.
This can be done using
oc apply
:oc apply -f install/user-operator
Verify that the User Operator has been deployed successfully. This can be done using
oc describe
:oc describe deployment strimzi-user-operator
The User Operator is deployed once the
Replicas:
entry shows1 available
.NoteThis could take some time if you have a slow connection to the OpenShift and the images have not been downloaded before.
Additional resources
- For more information about getting the Cluster Operator to deploy the User Operator for you, see Section 2.10.2, “Deploying the User Operator using the Cluster Operator”.