Chapter 7. Kafka configuration


A deployment of Kafka components to an OpenShift cluster using AMQ Streams is highly configurable through the application of custom resources. Custom resources are created as instances of APIs added by Custom resource definitions (CRDs) to extend OpenShift resources.

CRDs act as configuration instructions to describe the custom resources in an OpenShift cluster, and are provided with AMQ Streams for each Kafka component used in a deployment, as well as users and topics. CRDs and custom resources are defined as YAML files. Example YAML files are provided with the AMQ Streams distribution.

CRDs also allow AMQ Streams resources to benefit from native OpenShift features like CLI accessibility and configuration validation.

In this section we look at how Kafka components are configured through custom resources, starting with common configuration points and then important configuration considerations specific to components.

AMQ Streams provides example configuration files, which can serve as a starting point when building your own Kafka component configuration for deployment.

7.1. Custom resources

After a new custom resource type is added to your cluster by installing a CRD, you can create instances of the resource based on its specification.

The custom resources for AMQ Streams components have common configuration properties, which are defined under spec.

In this fragment from a Kafka topic custom resource, the apiVersion and kind properties identify the associated CRD. The spec property shows configuration that defines the number of partitions and replicas for the topic.

Kafka topic custom resource

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: my-topic
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 1
  replicas: 1
  # ...

There are many additional configuration options that can be incorporated into a YAML definition, some common and some specific to a particular component.

7.2. Common configuration

Some of the configuration options common to resources are described here. Security and metrics collection might also be adopted where applicable.

Bootstrap servers

Bootstrap servers are used for host/port connection to a Kafka cluster for:

  • Kafka Connect
  • Kafka Bridge
  • Kafka MirrorMaker producers and consumers
CPU and memory resources

You request CPU and memory resources for components. Limits specify the maximum resources that can be consumed by a given container.

Resource requests and limits for the Topic Operator and User Operator are set in the Kafka resource.

Logging
You define the logging level for the component. Logging can be defined directly (inline) or externally using a config map.
Healthchecks
Healthcheck configuration introduces liveness and readiness probes to know when to restart a container (liveness) and when a container can accept traffic (readiness).
JVM options
JVM options provide maximum and minimum memory allocation to optimize the performance of the component according to the platform it is running on.
Pod scheduling
Pod schedules use affinity/anti-affinity rules to determine under what circumstances a pod is scheduled onto a node.

Example YAML showing common configuration

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-cluster
spec:
  # ...
  bootstrapServers: my-cluster-kafka-bootstrap:9092
  resources:
    requests:
      cpu: 12
      memory: 64Gi
    limits:
      cpu: 12
      memory: 64Gi
  logging:
    type: inline
    loggers:
      connect.root.logger.level: "INFO"
  readinessProbe:
    initialDelaySeconds: 15
    timeoutSeconds: 5
  livenessProbe:
    initialDelaySeconds: 15
    timeoutSeconds: 5
  jvmOptions:
    "-Xmx": "2g"
    "-Xms": "2g"
  template:
    pod:
      affinity:
        nodeAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            nodeSelectorTerms:
              - matchExpressions:
                  - key: node-type
                    operator: In
                    values:
                      - fast-network
  # ...

7.3. Kafka cluster configuration

A kafka cluster comprises one or more brokers. For producers and consumers to be able to access topics within the brokers, Kafka configuration must define how data is stored in the cluster, and how the data is accessed. You can configure a Kafka cluster to run with multiple broker nodes across racks.

Storage

Kafka and ZooKeeper store data on disks.

AMQ Streams requires block storage provisioned through StorageClass. The file system format for storage must be XFS or EXT4. Three types of data storage are supported:

Ephemeral (Recommended for development only)
Ephemeral storage stores data for the lifetime of an instance. Data is lost when the instance is restarted.
Persistent
Persistent storage relates to long-term data storage independent of the lifecycle of the instance.
JBOD (Just a Bunch of Disks, suitable for Kafka only)
JBOD allows you to use multiple disks to store commit logs in each broker.

The disk capacity used by an existing Kafka cluster can be increased if supported by the infrastructure.

Listeners

Listeners configure how clients connect to a Kafka cluster.

By specifying a unique name and port for each listener within a Kafka cluster, you can configure multiple listeners.

The following types of listener are supported:

  • Internal listeners for access within OpenShift
  • External listeners for access outside of OpenShift

You can enable TLS encryption for listeners, and configure authentication.

Internal listeners expose Kafka by specifying an internal type:

  • internal to connect within the same OpenShift cluster
  • cluster-ip to expose Kafka using per-broker ClusterIP services

External listeners expose Kafka by specifying an external type:

  • route to use OpenShift routes and the default HAProxy router
  • loadbalancer to use loadbalancer services
  • nodeport to use ports on OpenShift nodes
  • ingress to use OpenShift Ingress and the Ingress NGINX Controller for Kubernetes
Note

With the cluster-ip type can add your own access mechanism. For example, you can use the listener with a custom Ingress controller or the OpenShift Gateway API.

If you are using OAuth 2.0 for token-based authentication, you can configure listeners to use the authorization server.

Rack awareness
Racks represent data centers, or racks in data centers, or availability zones. Configure rack awareness to distribute Kafka broker pods and topic replicas across racks. Enable rack awareness using the rack property to specify a topologyKey. The topologyKey is the name of the label assigned to OpenShift worker nodes, which identifies the rack. AMQ Streams assigns a rack ID to each Kafka broker. Kafka brokers use the IDs to spread partition replicas across racks. You can also specify the RackAwareReplicaSelector selector plugin to use with rack awareness. The plugin matches the rack IDs of brokers and consumers, so that messages are consumed from the closest replica. To use the plugin, consumers must also have rack awareness enabled. You can enable rack awareness in Kafka Connect, MirrorMaker 2, and the Kafka Bridge.

Example YAML showing Kafka configuration

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
    listeners:
      - name: tls
        port: 9093
        type: internal
        tls: true
        authentication:
          type: tls
      - name: external1
        port: 9094
        type: route
        tls: true
        authentication:
          type: tls
    # ...
    storage:
      type: persistent-claim
      size: 10000Gi
    # ...
    rack:
      topologyKey: topology.kubernetes.io/zone
    config:
      replica.selector.class: org.apache.kafka.common.replica.RackAwareReplicaSelector
    # ...

7.4. Kafka MirrorMaker configuration

Kafka MirrorMaker replicates data between two or more active Kafka clusters, within or across data centers. To set up MirrorMaker, a source and target (destination) Kafka cluster must be running.

7.4.1. MirrorMaker 2 configuration

MirrorMaker 2 consumes messages from a source Kafka cluster and writes them to a target Kafka cluster.

MirrorMaker 2 uses:

  • Source cluster configuration to consume data from the source cluster
  • Target cluster configuration to output data to the target cluster

MirrorMaker 2 is based on the Kafka Connect framework, connectors managing the transfer of data between clusters.

You configure MirrorMaker 2 to define the Kafka Connect deployment, including the connection details of the source and target clusters, and then run a set of MirrorMaker 2 connectors to make the connection.

MirrorMaker 2 consists of the following connectors:

MirrorSourceConnector
The source connector replicates topics from a source cluster to a target cluster. It also replicates ACLs and is necessary for the MirrorCheckpointConnector to run.
MirrorCheckpointConnector
The checkpoint connector periodically tracks offsets. If enabled, it also synchronizes consumer group offsets between the source and target cluster.
MirrorHeartbeatConnector
The heartbeat connector periodically checks connectivity between the source and target cluster.
Note

If you are using the User Operator to manage ACLs, ACL replication through the connector is not possible.

The process of mirroring data from a source cluster to a target cluster is asynchronous. Each MirrorMaker 2 instance mirrors data from one source cluster to one target cluster. You can use more than one MirrorMaker 2 instance to mirror data between any number of clusters.

Figure 7.1. Replication across two clusters

MirrorMaker 2 replication

By default, a check for new topics in the source cluster is made every 10 minutes. You can change the frequency by adding refresh.topics.interval.seconds to the source connector configuration.

7.4.1.1. Cluster configuration

You can use MirrorMaker 2 in active/passive or active/active cluster configurations.

active/active cluster configuration
An active/active configuration has two active clusters replicating data bidirectionally. Applications can use either cluster. Each cluster can provide the same data. In this way, you can make the same data available in different geographical locations. As consumer groups are active in both clusters, consumer offsets for replicated topics are not synchronized back to the source cluster.
active/passive cluster configuration
An active/passive configuration has an active cluster replicating data to a passive cluster. The passive cluster remains on standby. You might use the passive cluster for data recovery in the event of system failure.

The expectation is that producers and consumers connect to active clusters only. A MirrorMaker 2 cluster is required at each target destination.

7.4.1.2. Bidirectional replication (active/active)

The MirrorMaker 2 architecture supports bidirectional replication in an active/active cluster configuration.

Each cluster replicates the data of the other cluster using the concept of source and remote topics. As the same topics are stored in each cluster, remote topics are automatically renamed by MirrorMaker 2 to represent the source cluster. The name of the originating cluster is prepended to the name of the topic.

Figure 7.2. Topic renaming

MirrorMaker 2 bidirectional architecture

By flagging the originating cluster, topics are not replicated back to that cluster.

The concept of replication through remote topics is useful when configuring an architecture that requires data aggregation. Consumers can subscribe to source and remote topics within the same cluster, without the need for a separate aggregation cluster.

7.4.1.3. Unidirectional replication (active/passive)

The MirrorMaker 2 architecture supports unidirectional replication in an active/passive cluster configuration.

You can use an active/passive cluster configuration to make backups or migrate data to another cluster. In this situation, you might not want automatic renaming of remote topics.

You can override automatic renaming by adding IdentityReplicationPolicy to the source connector configuration. With this configuration applied, topics retain their original names.

Example YAML showing MirrorMaker 2 configuration
  apiVersion: kafka.strimzi.io/v1beta2
  kind: KafkaMirrorMaker2
  metadata:
    name: my-mirror-maker2
    spec:
      version: 3.4.0
      connectCluster: "my-cluster-target"
      clusters:
      - alias: "my-cluster-source"
        bootstrapServers: my-cluster-source-kafka-bootstrap:9092
      - alias: "my-cluster-target"
        bootstrapServers: my-cluster-target-kafka-bootstrap:9092
      mirrors:
      - sourceCluster: "my-cluster-source"
        targetCluster: "my-cluster-target"
        sourceConnector: {}
      topicsPattern: ".*"
      groupsPattern: "group1|group2|group3"

7.4.2. MirrorMaker configuration

Kafka MirrorMaker (also referred to as MirrorMaker 1) uses producers and consumers to replicate data across clusters as follows:

  • Consumers consume data from the source cluster
  • Producers output data to the target cluster

Consumer and producer configuration includes any required authentication and encryption settings. An include property defines the topics to mirror from the source to the target cluster.

Note

MirrorMaker was deprecated in Kafka 3.0.0 and will be removed in Kafka 4.0.0. As a consequence, the AMQ Streams KafkaMirrorMaker custom resource which is used to deploy MirrorMaker has been deprecated. The KafkaMirrorMaker resource will be removed from AMQ Streams when Kafka 4.0.0 is adopted.

Key Consumer configuration
Consumer group identifier
The consumer group ID for a MirrorMaker consumer so that messages consumed are assigned to a consumer group.
Number of consumer streams
A value to determine the number of consumers in a consumer group that consume a message in parallel.
Offset commit interval
An offset commit interval to set the time between consuming and committing a message.
Key Producer configuration
Cancel option for send failure
You can define whether a message send failure is ignored or MirrorMaker is terminated and recreated.
Example YAML showing MirrorMaker configuration
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker
metadata:
  name: my-mirror-maker
spec:
  # ...
  consumer:
    bootstrapServers: my-source-cluster-kafka-bootstrap:9092
    groupId: "my-group"
    numStreams: 2
    offsetCommitInterval: 120000
    # ...
  producer:
    # ...
    abortOnSendFailure: false
    # ...
  include: "my-topic|other-topic"
  # ...

7.5. Kafka Connect configuration

Use AMQ Streams’s KafkaConnect resource to quickly and easily create new Kafka Connect clusters.

When you deploy Kafka Connect using the KafkaConnect resource, you specify bootstrap server addresses (in spec.bootstrapServers) for connecting to a Kafka cluster. You can specify more than one address in case a server goes down. You also specify the authentication credentials and TLS encryption certificates to make a secure connection.

Note

The Kafka cluster doesn’t need to be managed by AMQ Streams or deployed to an OpenShift cluster.

You can also use the KafkaConnect resource to specify the following:

  • Plugin configuration to build a container image that includes the plugins to make connections
  • Configuration for the worker pods that belong to the Kafka Connect cluster
  • An annotation to enable use of the KafkaConnector resource to manage plugins

The Cluster Operator manages Kafka Connect clusters deployed using the KafkaConnect resource and connectors created using the KafkaConnector resource.

Plugin configuration

Plugins provide the implementation for creating connector instances. When a plugin is instantiated, configuration is provided for connection to a specific type of external data system. Plugins provide a set of one or more JAR files that define a connector and task implementation for connecting to a given kind of data source. Plugins for many external systems are available for use with Kafka Connect. You can also create your own plugins.

The configuration describes the source input data and target output data to feed into and out of Kafka Connect. For a source connector, external source data must reference specific topics that will store the messages. The plugins might also contain the libraries and files needed to transform the data.

A Kafka Connect deployment can have one or more plugins, but only one version of each plugin.

You can create a custom Kafka Connect image that includes your choice of plugins. You can create the image in two ways:

To create the container image automatically, you specify the plugins to add to your Kafka Connect cluster using the build property of the KafkaConnect resource. AMQ Streams automatically downloads and adds the plugin artifacts to a new container image.

Example plugin configuration

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  # ...
  build: 1
    output: 2
      type: docker
      image: my-registry.io/my-org/my-connect-cluster:latest
      pushSecret: my-registry-credentials
    plugins: 3
      - name: my-connector
        artifacts:
          - type: tgz
            url: https://<plugin_download_location>.tgz
            sha512sum: <checksum_to_verify_the_plugin>
      # ...
  # ...

1
Build configuration properties for building a container image with plugins automatically.
2
Configuration of the container registry where new images are pushed. The output properties describe the type and name of the image, and optionally the name of the secret containing the credentials needed to access the container registry.
3
List of plugins and their artifacts to add to the new container image. The plugins properties describe the type of artifact and the URL from which the artifact is downloaded. Each plugin must be configured with at least one artifact. Additionally, you can specify a SHA-512 checksum to verify the artifact before unpacking it.

If you are using a Dockerfile to build an image, you can use AMQ Streams’s latest container image as a base image to add your plugin configuration file.

Example showing manual addition of plugin configuration

FROM registry.redhat.io/amq-streams/kafka-34-rhel8:2.4.0
USER root:root
COPY ./my-plugins/ /opt/kafka/plugins/
USER 1001

Kafka Connect cluster configuration for workers

You specify the configuration for workers in the config property of the KafkaConnect resource.

A distributed Kafka Connect cluster has a group ID and a set of internal configuration topics.

  • group.id
  • offset.storage.topic
  • config.storage.topic
  • status.storage.topic

Kafka Connect clusters are configured by default with the same values for these properties. Kafka Connect clusters cannot share the group ID or topic names as it will create errors. If multiple different Kafka Connect clusters are used, these settings must be unique for the workers of each Kafka Connect cluster created.

The names of the connectors used by each Kafka Connect cluster must also be unique.

In the following example worker configuration, JSON converters are specified. A replication factor is set for the internal Kafka topics used by Kafka Connect. This should be at least 3 for a production environment. Changing the replication factor after the topics have been created will have no effect.

Example worker configuration

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
# ...
spec:
  config:
    # ...
    group.id: my-connect-cluster 1
    offset.storage.topic: my-connect-cluster-offsets 2
    config.storage.topic: my-connect-cluster-configs 3
    status.storage.topic: my-connect-cluster-status 4
    key.converter: org.apache.kafka.connect.json.JsonConverter 5
    value.converter: org.apache.kafka.connect.json.JsonConverter 6
    key.converter.schemas.enable: true 7
    value.converter.schemas.enable: true 8
    config.storage.replication.factor: 3 9
    offset.storage.replication.factor: 3 10
    status.storage.replication.factor: 3 11
  # ...

1
The Kafka Connect cluster ID within Kafka. Must be unique for each Kafka Connect cluster.
2
Kafka topic that stores connector offsets. Must be unique for each Kafka Connect cluster.
3
Kafka topic that stores connector and task status configurations. Must be unique for each Kafka Connect cluster.
4
Kafka topic that stores connector and task status updates. Must be unique for each Kafka Connect cluster.
5
Converter to transform message keys into JSON format for storage in Kafka.
6
Converter to transform message values into JSON format for storage in Kafka.
7
Schema enabled for converting message keys into structured JSON format.
8
Schema enabled for converting message values into structured JSON format.
9
Replication factor for the Kafka topic that stores connector offsets.
10
Replication factor for the Kafka topic that stores connector and task status configurations.
11
Replication factor for the Kafka topic that stores connector and task status updates.

KafkaConnector management of connectors

After plugins have been added to the container image used for the worker pods in a deployment, you can use AMQ Streams’s KafkaConnector custom resource or the Kafka Connect API to manage connector instances. You can also create new connector instances using these options.

The KafkaConnector resource offers an OpenShift-native approach to management of connectors by the Cluster Operator. To manage connectors with KafkaConnector resources, you must specify an annotation in your KafkaConnect custom resource.

Annotation to enable KafkaConnectors

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
  # ...

Setting use-connector-resources to true enables KafkaConnectors to create, delete, and reconfigure connectors.

If use-connector-resources is enabled in your KafkaConnect configuration, you must use the KafkaConnector resource to define and manage connectors. KafkaConnector resources are configured to connect to external systems. They are deployed to the same OpenShift cluster as the Kafka Connect cluster and Kafka cluster interacting with the external data system.

Kafka components are contained in the same OpenShift cluster

Kafka and Kafka Connect clusters

The configuration specifies how connector instances connect to an external data system, including any authentication. You also need to state what data to watch. For a source connector, you might provide a database name in the configuration. You can also specify where the data should sit in Kafka by specifying a target topic name.

Use tasksMax to specify the maximum number of tasks. For example, a source connector with tasksMax: 2 might split the import of source data into two tasks.

Example KafkaConnector source connector configuration

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: my-source-connector  1
  labels:
    strimzi.io/cluster: my-connect-cluster 2
spec:
  class: org.apache.kafka.connect.file.FileStreamSourceConnector 3
  tasksMax: 2 4
  config: 5
    file: "/opt/kafka/LICENSE" 6
    topic: my-topic 7
    # ...

1
Name of the KafkaConnector resource, which is used as the name of the connector. Use any name that is valid for an OpenShift resource.
2
Name of the Kafka Connect cluster to create the connector instance in. Connectors must be deployed to the same namespace as the Kafka Connect cluster they link to.
3
Full name of the connector class. This should be present in the image being used by the Kafka Connect cluster.
4
Maximum number of Kafka Connect tasks that the connector can create.
5
Connector configuration as key-value pairs.
6
Location of the external data file. In this example, we’re configuring the FileStreamSourceConnector to read from the /opt/kafka/LICENSE file.
7
Kafka topic to publish the source data to.
Note

You can load confidential configuration values for a connector from OpenShift Secrets or ConfigMaps.

Kafka Connect API

Use the Kafka Connect REST API as an alternative to using KafkaConnector resources to manage connectors. The Kafka Connect REST API is available as a service running on <connect_cluster_name>-connect-api:8083, where <connect_cluster_name> is the name of your Kafka Connect cluster.

You add the connector configuration as a JSON object.

Example curl request to add connector configuration

curl -X POST \
  http://my-connect-cluster-connect-api:8083/connectors \
  -H 'Content-Type: application/json' \
  -d '{ "name": "my-source-connector",
    "config":
    {
      "connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector",
      "file": "/opt/kafka/LICENSE",
      "topic":"my-topic",
      "tasksMax": "4",
      "type": "source"
    }
}'

If KafkaConnectors are enabled, manual changes made directly using the Kafka Connect REST API are reverted by the Cluster Operator.

The operations supported by the REST API are described in the Apache Kafka Connect API documentation.

Note

You can expose the Kafka Connect API service outside OpenShift. You do this by creating a service that uses a connection mechanism that provides the access, such as an ingress or route. Use advisedly as the connection is insecure.

7.6. Kafka Bridge configuration

A Kafka Bridge configuration requires a bootstrap server specification for the Kafka cluster it connects to, as well as any encryption and authentication options required.

Kafka Bridge consumer and producer configuration is standard, as described in the Apache Kafka configuration documentation for consumers and Apache Kafka configuration documentation for producers.

HTTP-related configuration options set the port connection which the server listens on.

CORS

The Kafka Bridge supports the use of Cross-Origin Resource Sharing (CORS). CORS is a HTTP mechanism that allows browser access to selected resources from more than one origin, for example, resources on different domains. If you choose to use CORS, you can define a list of allowed resource origins and HTTP methods for interaction with the Kafka cluster through the Kafka Bridge. The lists are defined in the http specification of the Kafka Bridge configuration.

CORS allows for simple and preflighted requests between origin sources on different domains.

  • A simple request is a HTTP request that must have an allowed origin defined in its header.
  • A preflighted request sends an initial OPTIONS HTTP request before the actual request to check that the origin and the method are allowed.

Example YAML showing Kafka Bridge configuration

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaBridge
metadata:
  name: my-bridge
spec:
  # ...
  bootstrapServers: my-cluster-kafka:9092
  http:
    port: 8080
    cors:
      allowedOrigins: "https://strimzi.io"
      allowedMethods: "GET,POST,PUT,DELETE,OPTIONS,PATCH"
  consumer:
    config:
      auto.offset.reset: earliest
  producer:
    config:
      delivery.timeout.ms: 300000
  # ...

Additional resources

Red Hat logoGithubRedditYoutubeTwitter

Learn

Try, buy, & sell

Communities

About Red Hat Documentation

We help Red Hat users innovate and achieve their goals with our products and services with content they can trust.

Making open source more inclusive

Red Hat is committed to replacing problematic language in our code, documentation, and web properties. For more details, see the Red Hat Blog.

About Red Hat

We deliver hardened solutions that make it easier for enterprises to work across platforms and environments, from the core datacenter to the network edge.

© 2024 Red Hat, Inc.