Chapter 5. Configuring secure connections
Securing the connection between a Kafka cluster and a client application helps to ensure the confidentiality, integrity, and authenticity of the communication between the cluster and the client.
To achieve a secure connection, you can introduce configuration related to authentication, encryption, and authorization:
- Authentication
- Use an authentication mechanism to verify the identity of a client application.
- Encryption
- Enable encryption of data in transit between the client and broker using SSL/TLS encryption.
- Authorization
- Control client access and operations allowed on Kafka brokers based on the authenticated identity of a client application.
Authorization cannot be used without authentication. If authentication is not enabled, it’s not possible to determine the identity of clients, and therefore, it’s not possible to enforce authorization rules. This means that even if authorization rules are defined, they will not be enforced without authentication.
In Streams for Apache Kafka, listeners are used to configure the network connections between the Kafka brokers and the clients. Listener configuration options determine how the brokers listen for incoming client connections and how secure access is managed. The exact configuration required depends on the authentication, encryption, and authorization mechanisms you have chosen.
You configure your Kafka brokers and client applications to enable security features. The general outline to secure a client connection to a Kafka cluster is as follows:
- Install the Streams for Apache Kafka components, including the Kafka cluster.
- For TLS, generate TLS certificates for each broker and client application.
- Configure listeners in the broker configuration for secure connection.
- Configure the client application for secure connection.
Configure your client application according to the mechanisms you are using to establish a secure and authenticated connection with the Kafka brokers. The authentication, encryption, and authorization used by a Kafka broker must match those used by a connecting client application. The client application and broker need to agree on the security protocols and configurations for secure communication to take place. For example, a Kafka client and the Kafka broker must use the same TLS versions and cipher suites.
Mismatched security configurations between the client and broker can result in connection failures or potential security vulnerabilities. It’s important to carefully configure and test both the broker and client application to ensure they are properly secured and able to communicate securely.
5.1. Setting up brokers for secure access
Before you can configure client applications for secure access, you must first set up the brokers in your Kafka cluster to support the security mechanisms you want to use. To enable secure connections, you create listeners with the appropriate configuration for the security mechanisms.
5.1.1. Establishing a secure connection to a Kafka cluster running on RHEL
When using Streams for Apache Kafka on RHEL, the general outline to secure a client connection to a Kafka cluster is as follows:
- Install the Streams for Apache Kafka components, including the Kafka cluster, on the RHEL server.
- For TLS, generate TLS certificates for all brokers in the Kafka cluster.
Configure listeners in the broker configuration properties file.
- Configure authentication for your Kafka cluster listeners, such as TLS or SASL SCRAM-SHA-512.
-
Configure authorization for all enabled listeners on the Kafka cluster, such as
simple
authorization.
- For TLS, generate TLS certificates for each client application.
-
Create a
config.properties
file to specify the connection details and authentication credentials used by the client application. Start the Kafka client application and connect to the Kafka cluster.
-
Use the properties defined in the
config.properties
file to connect to the Kafka broker.
-
Use the properties defined in the
- Verify that the client can successfully connect to the Kafka cluster and consume and produce messages securely.
5.1.2. Configuring secure listeners for a Kafka cluster on RHEL
Use a configuration properties file to configure listeners in Kafka. To configure a secure connection for Kafka brokers, you set the relevant properties for TLS, SASL, and other security-related configurations in this file.
Here is an example configuration of a TLS listener specified in a server.properties
configuration file for a Kafka broker, with a keystore and truststore in PKCS#12 format:
Example listener configuration in server.properties
listeners = listener_1://0.0.0.0:9093, listener_2://0.0.0.0:9094 listener.security.protocol.map = listener_1:SSL, listener_2:PLAINTEXT ssl.keystore.type = PKCS12 ssl.keystore.location = /path/to/keystore.p12 ssl.keystore.password = <password> ssl.truststore.type = PKCS12 ssl.truststore.location = /path/to/truststore.p12 ssl.truststore.password = <password> ssl.client.auth = required authorizer.class.name = kafka.security.auth.SimpleAclAuthorizer. super.users = User:superuser
listeners = listener_1://0.0.0.0:9093, listener_2://0.0.0.0:9094
listener.security.protocol.map = listener_1:SSL, listener_2:PLAINTEXT
ssl.keystore.type = PKCS12
ssl.keystore.location = /path/to/keystore.p12
ssl.keystore.password = <password>
ssl.truststore.type = PKCS12
ssl.truststore.location = /path/to/truststore.p12
ssl.truststore.password = <password>
ssl.client.auth = required
authorizer.class.name = kafka.security.auth.SimpleAclAuthorizer.
super.users = User:superuser
The listeners
property specifies each listener name, and the IP address and port that the broker listens on. The protocol map tells the listener_1
listener to use the SSL protocol for clients that use TLS encryption. listener_2
provides PLAINTEXT connections for clients that do not use TLS encryption. The keystore contains the broker’s private key and certificate. The truststore contains the trusted certificates used to verify the identity of the client application. The ssl.client.auth
property enforces client authentication.
The Kafka cluster uses simple authorization. The authorizer is set to SimpleAclAuthorizer
. A single super user is defined for unconstrained access on all listeners. Streams for Apache Kafka supports the Kafka SimpleAclAuthorizer
and custom authorizer plugins.
If we prefix the configuration properties with listener.name.<name_of_listener>
, the configuration is specific to that listener.
This is just a sample configuration. Some configuration options are specific to the type of listener. If you are using OAuth 2.0 or Open Policy Agent (OPA), you must also configure access to the authorization server or OPA server in a specific listener. You can create listeners based on your specific requirements and environment.
For more information on listener configuration, see the Apache Kafka documentation.
Using ACLs to fine-tune access
You can use Access Control Lists (ACLs) to fine-tune access to the Kafka cluster. To create and manage Access Control Lists (ACLs), use the kafka-acls.sh
command line tool. The ACLs apply access rules to client applications.
In the following example, the first ACL grants read and describe permissions for a specific topic named my-topic
. The resource.patternType
is set to literal
, which means that the resource name must match exactly.
The second ACL grants read permissions for a specific consumer group named my-group
. The resource.patternType
is set to prefix
, which means that the resource name must match the prefix.
Example ACL configuration
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add \ --allow-principal User:my-user --operation Read --operation Describe --topic my-topic --resource-pattern-type literal \ --allow-principal User:my-user --operation Read --group my-group --resource-pattern-type prefixed
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add \
--allow-principal User:my-user --operation Read --operation Describe --topic my-topic --resource-pattern-type literal \
--allow-principal User:my-user --operation Read --group my-group --resource-pattern-type prefixed
5.1.3. Establishing a secure connection to a Kafka cluster running on OpenShift
When using Streams for Apache Kafka on OpenShift, the general outline to secure a client connection to a Kafka cluster is as follows:
Use the Cluster Operator to deploy a Kafka cluster in your OpenShift environment. Use the
Kafka
custom resource to configure and install the cluster and create listeners.- Configure authentication for the listeners, such as TLS or SASL SCRAM-SHA-512. The Cluster Operator creates a secret that contains a cluster CA certificate to verify the identity of the Kafka brokers.
-
Configure authorization for all enabled listeners, such as
simple
authorization.
Use the User Operator to create a Kafka user representing your client. Use the
KafkaUser
custom resource to configure and create the user.- Configure authentication for your Kafka user (client) that matches the authentication mechanism of a listener. The User Operator creates a secret that contains a client certificate and private key for the client to use for authentication with the Kafka cluster.
- Configure authorization for your Kafka user (client) that matches the authorization mechanism of the listener. Authorization rules allow specific operations on the Kafka cluster.
-
Create a
config.properties
file to specify the connection details and authentication credentials required by the client application to connect to the cluster. Start the Kafka client application and connect to the Kafka cluster.
-
Use the properties defined in the
config.properties
file to connect to the Kafka broker.
-
Use the properties defined in the
- Verify that the client can successfully connect to the Kafka cluster and consume and produce messages securely.
5.1.4. Configuring secure listeners for a Kafka cluster on OpenShift
When you deploy a Kafka
custom resource with Streams for Apache Kafka, you add listener configuration to the Kafka spec
. Use the listener configuration to secure connections in Kafka. To configure a secure connection for Kafka brokers, set the relevant properties for TLS, SASL, and other security-related configurations at the listener level.
External listeners provide client access to a Kafka cluster from outside the OpenShift cluster. Streams for Apache Kafka creates listener services and bootstrap addresses to enable access to the Kafka cluster based on the configuration. For example, you can create external listeners that use the following connection mechanisms:
- Node ports
- loadbalancers
- Openshift routes
Here is an example configuration of a nodeport
listener for a Kafka
resource:
Example listener configuration in the Kafka
resource
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster spec: kafka: # ... listeners: - name: plaintext port: 9092 type: internal tls: false configuration: useServiceDnsDomain: true - name: tls port: 9093 type: internal tls: true authentication: type: tls - name: external port: 9094 type: route tls: true authentication: type: tls authorization: type: simple superUsers: - CN=superuser # ...
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
# ...
listeners:
- name: plaintext
port: 9092
type: internal
tls: false
configuration:
useServiceDnsDomain: true
- name: tls
port: 9093
type: internal
tls: true
authentication:
type: tls
- name: external
port: 9094
type: route
tls: true
authentication:
type: tls
authorization:
type: simple
superUsers:
- CN=superuser
# ...
The listeners
property is configured with three listeners: plaintext
, tls
, and external
. The external
listener is of type nodeport
, and it uses TLS for both encryption and authentication. When you create the Kafka cluster with the Cluster Operator, CA certificates are automatically generated. You add cluster CA to the truststore of your client application to verify the identity of the Kafka brokers. Alternatively, you can configure Streams for Apache Kafka to use your own certificates at the broker or listener level. Using certificates at the listener level might be required when client applications require different security configurations. Using certificates at the listener level also adds an additional layer of control and security.
Use configuration provider plugins to load configuration data to producer and consumer clients. The configuration Provider plugin loads configuration data from secrets or ConfigMaps. For example, you can tell the provider to automatically get certificates from Strimzi secrets. For more information, see the Streams for Apache Kafka documentation for running onOpenShift.
The Kafka cluster uses simple authorization. The authorization property type is set to simple
. A single super user is defined for unconstrained access on all listeners. Streams for Apache Kafka supports the Kafka SimpleAclAuthorizer
and custom authorizer plugins.
This is just a sample configuration. Some configuration options are specific to the type of listener. If you are using OAuth 2.0 or Open Policy Agent (OPA), you must also configure access to the authorization server or OPA server in a specific listener. You can create listeners based on your specific requirements and environment.
For more information on listener configuration, see the GenericKafkaListener
schema reference.
When using a route
type listener for client access to a Kafka cluster on OpenShift, the TLS passthrough feature is enabled. An OpenShift route is designed to work with the HTTP protocol, but it can also be used to proxy network traffic for other protocols, including the Kafka protocol used by Apache Kafka. The client establishes a connection to the route, and the route forwards the traffic to the broker running in the OpenShift cluster using the TLS Server Name Indication (SNI) extension to get the target hostname. The SNI extension allows the route to correctly identify the target broker for each connection.
Using ACLs to fine-tune access
You can use Access Control Lists (ACLs) to fine-tune access to the Kafka cluster. To add Access Control Lists (ACLs), you configure the KafkaUser
custom resource. When you create a KafkaUser
, Streams for Apache Kafka automatically manages the creation and updates the ACLs. The ACLs apply access rules to client applications.
In the following example, the first ACL grants read and describe permissions for a specific topic named my-topic
. The resource.patternType
is set to literal
, which means that the resource name must match exactly.
The second ACL grants read permissions for a specific consumer group named my-group
. The resource.patternType
is set to prefix
, which means that the resource name must match the prefix.
Example ACL configuration in the KafkaUser
resource
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaUser metadata: name: my-user labels: strimzi.io/cluster: my-cluster spec: # ... authorization: type: simple acls: - resource: type: topic name: my-topic patternType: literal operations: - Read - Describe - resource: type: group name: my-group patternType: prefix operations: - Read
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
name: my-user
labels:
strimzi.io/cluster: my-cluster
spec:
# ...
authorization:
type: simple
acls:
- resource:
type: topic
name: my-topic
patternType: literal
operations:
- Read
- Describe
- resource:
type: group
name: my-group
patternType: prefix
operations:
- Read
If you specify tls-external
as an authentication option when configuring the Kafka user, you can use your own client certificates rather than those generated by the User Operator.
5.2. Setting up clients for secure access
After you have set up listeners on your Kafka brokers to support secure connections, the next step is to configure your client applications to use these listeners to communicate with the Kafka cluster. This involves providing the appropriate security settings for each client to authenticate with the cluster based on the security mechanisms configured on the listener.
5.2.1. Configuring security protocols
Configure the security protocol used by your client application to match the protocol configured on a Kafka broker listener. For example, use SSL
(Secure Sockets Layer) for TLS authentication or SASL_SSL
for SASL (Simple Authentication and Security Layer over SSL) authentication with TLS encryption. Add a truststore and keystore to your client configuration that supports the authentication mechanism required to access the Kafka cluster.
- Truststore
- The truststore contains the public certificates of the trusted certificate authority (CA) that are used to verify the authenticity of a Kafka broker. When the client connects to a secure Kafka broker, it might need to verify the identity of the broker.
- Keystore
- The keystore contains the client’s private key and its public certificate. When the client wants to authenticate itself to the broker, it presents its own certificate.
If you are using TLS authentication, your Kafka client configuration requires a truststore and keystore to connect to a Kafka cluster. If you are using SASL SCRAM-SHA-512, authentication is performed through the exchange of username and password credentials, rather than digital certificates, so a keystore is not required. SCRAM-SHA-512 is a more lightweight mechanism, but it is not as secure as using certificate-based authentication.
If you have your own certificate infrastructure in place and use certificates from a third-party CA, then the client’s default truststore will likely already contain the public CA certificates and you do not need to add them to the client’s truststore. The client automatically trusts the server’s certificate if it is signed by one of the public CA certificates that is already included in the default truststore.
You can create a config.properties
file to specify the authentication credentials used by the client application.
In the following example, the security.protocol
is set to SSL
to enable TLS authentication and encryption between the client and broker.
The ssl.truststore.location
and ssl.truststore.password
properties specify the location and password of the truststore. The ssl.keystore.location
and ssl.keystore.password
properties specify the location and password of the keystore.
The PKCS #12 (Public-Key Cryptography Standards #12) file format is used. You can also use the base64-encoded PEM (Privacy Enhanced Mail) format.
Example client configuration properties for TLS authentication
bootstrap.servers = my-cluster-kafka-bootstrap:9093 security.protocol = SSL ssl.truststore.location = /path/to/ca.p12 ssl.truststore.password = truststore-password ssl.keystore.location = /path/to/user.p12 ssl.keystore.password = keystore-password client.id = my-client
bootstrap.servers = my-cluster-kafka-bootstrap:9093
security.protocol = SSL
ssl.truststore.location = /path/to/ca.p12
ssl.truststore.password = truststore-password
ssl.keystore.location = /path/to/user.p12
ssl.keystore.password = keystore-password
client.id = my-client
In the following example, the security.protocol
is set to SASL_SSL
to enable SASL authentication with TLS encryption between the client and broker. If you only need authentication and not encryption, you can use the SASL
protocol. The specified SASL mechanism for authentication is SCRAM-SHA-512
. Different authentication mechanisms can be used. sasl.jaas.config
properties specify the authentication credentials.
Example client configuration properties for SCRAM-SHA-512 authentication
bootstrap.servers = my-cluster-kafka-bootstrap:9093 security.protocol = SASL_SSL sasl.mechanism = SCRAM-SHA-512 sasl.jaas.config = org.apache.kafka.common.security.scram.ScramLoginModule required \ username = "user" \ password = "secret"; ssl.truststore.location = path/to/truststore.p12 ssl.truststore.password = truststore_password ssl.truststore.type = PKCS12 client.id = my-client
bootstrap.servers = my-cluster-kafka-bootstrap:9093
security.protocol = SASL_SSL
sasl.mechanism = SCRAM-SHA-512
sasl.jaas.config = org.apache.kafka.common.security.scram.ScramLoginModule required \
username = "user" \
password = "secret";
ssl.truststore.location = path/to/truststore.p12
ssl.truststore.password = truststore_password
ssl.truststore.type = PKCS12
client.id = my-client
For applications that do not support PEM format, you can use a tool like OpenSSL to convert PEM files to PKCS #12 format.
5.2.2. Configuring permitted TLS versions and cipher suites
You can incorporate SSL configuration and cipher suites to further secure TLS-based communication between your client application and a Kafka cluster. Specify the supported TLS versions and cipher suites in the configuration for the Kafka broker. You can also add the configuration to your clients if you wish to limit the TLS versions and cipher suites they use. The configuration on the client should only use protocols and cipher suites that are enabled on the brokers.
In the following example, SSL is enabled using security.protocol
for communication between Kafka brokers and client applications. You specify cipher suites as a comma-separated list. The ssl.cipher.suites property
is a comma-separated list of cipher suites that the client is allowed to use.
Example SSL configuration properties for Kafka brokers
security.protocol: "SSL" ssl.enabled.protocols: "TLSv1.3", "TLSv1.2" ssl.protocol: "TLSv1.3" ssl.cipher.suites: "TLS_AES_256_GCM_SHA384"
security.protocol: "SSL"
ssl.enabled.protocols: "TLSv1.3", "TLSv1.2"
ssl.protocol: "TLSv1.3"
ssl.cipher.suites: "TLS_AES_256_GCM_SHA384"
The ssl.enabled.protocols
property specifies the available TLS versions that can be used for secure communication between the cluster and its clients. In this case, both TLSv1.3
and TLSv1.2
are enabled. The ssl.protocol
property sets the default TLS version for all connections, and it must be chosen from the enabled protocols. By default, clients communicate using TLSv1.3
. If a client only supports TLSv1.2, it can still connect to the broker and communicate using that supported version. Similarly, if the configuration is on the client and the broker only supports TLSv1.2, the client uses the supported version.
The cipher suites supported by Apache Kafka depend on the version of Kafka you are using and the underlying environment. Check for the latest supported cipher suites that provide the highest level of security.
5.2.3. Using Access Control Lists (ACLs)
You do not have to configure anything explicitly for ACLS in your client application. The ACLs are enforced on the server side by the Kafka broker. When the client sends a request to the server to produce or consume data, the server checks the ACLs to determine if the client (user) is authorized to perform the requested operation. If the client is authorized, the request is processed; otherwise, the request is denied and an error is returned. However, the client must still be authenticated and using the appropriate security protocol to enable a secure connection with the Kafka cluster.
If you are using Access Control Lists (ACLs) on your Kafka brokers, make sure that ACLs are properly set up to restrict client access to the topics and operations that you want to control. If you are using Open Policy Agent (OPA) policies to manage access, authorization rules are configured in the policies, so you won’t need specify ACLs against the Kafka brokers. OAuth 2.0 gives some flexibility: you can use the OAuth 2.0 provider to manage ACLs; or use OAuth 2.0 and Kafka’s simple
authorization to manage the ACLs.
ACLs apply to most types of requests and are not limited to produce and consume operations. For example, ACLS can be applied to read operations like describing topics or write operations like creating new topics.
5.2.4. Using OAuth 2.0 for token-based access
Use the OAuth 2.0 open standard for authorization with Streams for Apache Kafka to enforce authorization controls through an OAuth 2.0 provider. OAuth 2.0 provides a secure way for applications to access user data stored in other systems. An authorization server can issue access tokens to client applications that grant access to a Kafka cluster.
The following steps describe the general approach to set up and use OAuth 2.0 for token validation:
- Configure the authorization server with broker and client credentials, such as a client ID and secret.
- Obtain the OAuth 2.0 credentials from the authorization server.
- Configure listeners on the Kafka brokers with OAuth 2.0 credentials and to interact with the authorization server.
- Add the Oauth 2.0 dependency to the client library.
- Configure your Kafka client with OAuth 2.0 credentials and to interact with the authorization server..
- Obtain an access token at runtime, which authenticates the client with the OAuth 2.0 provider.
If you have a listener configured for OAuth 2.0 on your Kafka broker, you can set up your client application to use OAuth 2.0. In addition to the standard Kafka client configurations to access the Kafka cluster, you must include specific configurations for OAuth 2.0 authentication. You must also make sure that the authorization server you are using is accessible by the Kafka cluster and client application.
Specify a SASL (Simple Authentication and Security Layer) security protocol and mechanism. In a production environment, the following settings are recommended:
-
The
SASL_SSL
protocol for TLS encrypted connections. -
The
OAUTHBEARER
mechanism for credentials exchange using a bearer token
A JAAS (Java Authentication and Authorization Service) module implements the SASL mechanism. The configuration for the mechanism depends on the authentication method you are using. For example, using credentials exchange you add an OAuth 2.0 access token endpoint, access token, client ID, and client secret. A client connects to the token endpoint (URL) of the authorization server to check if a token is still valid. You also need a truststore that contains the public key certificate of the authorization server for authenticated access.
Example client configuration properties for OAauth 2.0
bootstrap.servers = my-cluster-kafka-bootstrap:9093 security.protocol = SASL_SSL sasl.mechanism = OAUTHBEARER # ... sasl.jaas.config = org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.token.endpoint.uri = "https://localhost:9443/oauth2/token" \ oauth.access.token = <access_token> \ oauth.client.id = "<client_id>" \ oauth.client.secret = "<client_secret>" \ oauth.ssl.truststore.location = "/<truststore_location>/oauth-truststore.p12" \ oauth.ssl.truststore.password = "<truststore_password>" \ oauth.ssl.truststore.type = "PKCS12" \
bootstrap.servers = my-cluster-kafka-bootstrap:9093
security.protocol = SASL_SSL
sasl.mechanism = OAUTHBEARER
# ...
sasl.jaas.config = org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
oauth.token.endpoint.uri = "https://localhost:9443/oauth2/token" \
oauth.access.token = <access_token> \
oauth.client.id = "<client_id>" \
oauth.client.secret = "<client_secret>" \
oauth.ssl.truststore.location = "/<truststore_location>/oauth-truststore.p12" \
oauth.ssl.truststore.password = "<truststore_password>" \
oauth.ssl.truststore.type = "PKCS12" \
5.2.5. Using Open Policy Agent (OPA) access policies
Use the Open Policy Agent (OPA) policy agent with Streams for Apache Kafka to evaluate requests to connect to your Kafka cluster against access policies. Open Policy Agent (OPA) is a policy engine that manages authorization policies. Policies centralize access control, and can be updated dynamically, without requiring changes to the client application. For example, you can create a policy that allows only certain users (clients) to produce and consume messages to a specific topic.
Streams for Apache Kafka uses the Open Policy Agent plugin for Kafka authorization as the authorizer.
The following steps describe the general approach to set up and use OPA:
- Set up an instance of the OPA server.
- Define policies that provide the authorization rules that govern access to the Kafka cluster.
- Create configuration for the Kafka brokers to accept OPA authorization and interact with the OPA server.
- Configure your Kafka client to provide the credentials for authorized access to the Kafka cluster.
If you have a listener configured for OPA on your Kafka broker, you can set up your client application to use OPA. In the listener configuration, you specify a URL to connect to the OPA server and authorize your client application. In addition to the standard Kafka client configurations to access the Kafka cluster, you must add the credentials to authenticate with the Kafka broker. The broker checks if the client has the necessary authorization to perform a requested operation, by sending a request to the OPA server to evaluate the authorization policy. You don’t need a truststore or keystore to secure communication as the policy engine enforces authorization policies.
Example client configuration properties for OPA authorization
bootstrap.servers = my-cluster-kafka-bootstrap:9093 security.protocol = SASL_SSL sasl.mechanism = SCRAM-SHA-512 sasl.jaas.config = org.apache.kafka.common.security.scram.ScramLoginModule required \ username = "user" \ password = "secret"; # ...
bootstrap.servers = my-cluster-kafka-bootstrap:9093
security.protocol = SASL_SSL
sasl.mechanism = SCRAM-SHA-512
sasl.jaas.config = org.apache.kafka.common.security.scram.ScramLoginModule required \
username = "user" \
password = "secret";
# ...
Red Hat does not support the OPA server.
5.2.6. Using transactions when streaming messages
By configuring transaction properties in your brokers and producer client application, you can ensure that messages are processed in a single transaction. Transactions add reliability and consistency to the streaming of messages.
Transactions are always enabled on brokers. You can change the default configuration using the following properties:
Example Kafka broker configuration properties for transactions
transaction.state.log.replication.factor = 3 transaction.state.log.min.isr = 2 transaction.abort.timed.out.transaction.cleanup.interval.ms = 3600000
transaction.state.log.replication.factor = 3
transaction.state.log.min.isr = 2
transaction.abort.timed.out.transaction.cleanup.interval.ms = 3600000
This is a typical configuration for a production environment, which creates 3 replicas for the internal __transaction_state
topic. The \__transaction_state
topic stores information about the transactions in progress. A minimum of 2 in-sync replicas are required for the transaction logs. The cleanup interval is the time between checks for timed-out transactions and a clean up the corresponding transaction logs.
To add transaction properties to a client configuration, you set the following properties for producers and consumers.
Example producer client configuration properties for transactions
transactional.id = unique-transactional-id enable.idempotence = true max.in.flight.requests.per.connection = 5 acks = all retries=2147483647 transaction.timeout.ms = 30000 delivery.timeout = 25000
transactional.id = unique-transactional-id
enable.idempotence = true
max.in.flight.requests.per.connection = 5
acks = all
retries=2147483647
transaction.timeout.ms = 30000
delivery.timeout = 25000
The transactional ID allows the Kafka broker to keep track of the transactions. It is a unique identifier for the producer and should be used with a specific set of partitions. If you need to perform transactions for multiple sets of partitions, you need to use a different transactional ID for each set. Idempotence is enabled to avoid the producer instance creating duplicate messages. With idempotence, messages are tracked using a producer ID and sequence number. When the broker receives the message, it checks the producer ID and sequence number. If a message with the same producer ID and sequence number has already been received, the broker discards the duplicate message.
The maximum number of in-flight requests is set to 5 so that transactions are processed in the order they are sent. A partition can have up to 5 in-flight requests without compromising the ordering of messages.
By setting acks
to all
, the producer waits for acknowledgments from all in-sync replicas of the topic partitions to which it is writing before considering the transaction as complete. This ensures that the messages are durably written (committed) to the Kafka cluster, and that they will not be lost even in the event of a broker failure.
The transaction timeout specifies the maximum amount of time the client has to complete a transaction before it times out. The delivery timeout specifies the maximum amount of time the producer waits for a broker acknowledgement of message delivery before it times out. To ensure that messages are delivered within the transaction period, set the delivery timeout to be less than the transaction timeout. Consider network latency and message throughput, and allow for temporary failures, when specifying retries
for the number of attempts to resend a failed message request.
Example consumer client configuration properties for transactions
group.id = my-group-id isolation.level = read_committed enable.auto.commit = false
group.id = my-group-id
isolation.level = read_committed
enable.auto.commit = false
The read_committed
isolation level specifies that the consumer only reads messages for a transaction that has completed successfully. The consumer does not process any messages that are part of an ongoing or failed transaction. This ensures that the consumer only reads messages that are part of a fully complete transaction.
When using transactions to stream messages, it is important to set enable.auto.commit
to false
. If set to true
, the consumer periodically commits offsets without consideration to transactions. This means that the consumer may commit messages before a transaction has fully completed. By setting enable.auto.commit
to false
, the consumer only reads and commits messages that have been fully written and committed to the topic as part of a transaction.