이 콘텐츠는 선택한 언어로 제공되지 않습니다.
Chapter 2. Enhancements
The enhancements added in this release are outlined below.
2.1. Kafka enhancements
For an overview of the enhancements introduced with:
- Kafka 2.6.2, refer to the Kafka 2.6.2 Release Notes (applies only to AMQ Streams 1.6.4)
- Kafka 2.6.1, refer to the Kafka 2.6.1 Release Notes (applies only to AMQ Streams 1.6.4)
- Kafka 2.6.0, refer to the Kafka 2.6.0 Release Notes
2.2. Kafka Bridge enhancements
This release includes the following enhancements to the Kafka Bridge component of AMQ Streams.
Retrieve partitions and metadata
The Kafka Bridge now supports the following operations:
Retrieve a list of partitions for a given topic:
GET /topics/{topicname}/partitions
Retrieve metadata for a given partition, such as the partition ID, the leader broker, and the number of replicas:
GET /topics/{topicname}/partitions/{partitionid}
See the Kafka Bridge API reference.
Support for Kafka message headers
Messages sent using the Kafka Bridge can now include Kafka message headers.
In a POST request to the /topics
endpoint, you can optionally specify headers in the message payload, which is contained in the request body. Message header values must be in binary format and encoded as Base64.
Example request with Kafka message header
curl -X POST \ http://localhost:8080/topics/my-topic \ -H 'content-type: application/vnd.kafka.json.v2+json' \ -d '{ "records": [ { "key": "my-key", "value": "sales-lead-0001" "partition": 2 "headers": [ { "key": "key1", "value": "QXBhY2hlIEthZmthIGlzIHRoZSBib21iIQ==" } ] }, ] }'
2.3. MirrorMaker 2.0 topic renaming update
The MirrorMaker 2.0 architecture supports bidirectional replication by automatically renaming remote topics to represent the source cluster. The name of the originating cluster is prepended to the name of the topic.
Optionally, you can now override automatic renaming by adding IdentityReplicationPolicy
to the source connector configuration. With this configuration applied, topics retain their original names.
replication.policy.class= io.strimzi.kafka.connect.mirror.IdentityReplicationPolicy 1
- 1
- Adds a policy that overrides the automatic renaming of remote topics. Instead of prepending the name with the name of the source cluster, the topic retains its original name.
The override is useful, for example, in an active/passive cluster configuration where you want to make backups or migrate data to another cluster. In either situation, you might not want automatic renaming of remote topics.
2.4. OAuth 2.0 authentication and authorization
This release includes the following enhancements to OAuth 2.0 token-based authentication and authorization.
Session re-authentication
OAuth 2.0 authentication in AMQ Streams now supports session re-authentication for Kafka brokers. This defines the maximum duration of an authenticated OAuth 2.0 session between a Kafka client and a Kafka broker. Session re-authentication is supported for both types of token validation: fast local JWT and introspection endpoint.
You configure session re-authentication in the OAuth 2.0 configuration for Kafka brokers, in the server.properties
file.
-
To apply to all listeners, set the
connections.max.reauth.ms
property in milliseconds. -
To apply to a specific listener, set the
listener.name.LISTENER-NAME.oauthbearer.connections.max.reauth.ms
property in milliseconds. LISTENER-NAME is the case-insensitive name of the listener.
An authenticated session is closed if it exceeds the configured maximum session re-authentication time, or if the access token expiry time is reached. Then, the client must log in to the authorization server again, obtain a new access token, and then re-authenticate to the Kafka broker. This will establish a new authenticated session over the existing connection.
When re-authentication is next required, any operation that is attempted by the client (apart from re-authentication) will cause the broker to terminate the connection.
Example listener configuration for session re-authentication after 6 minutes
sasl.enabled.mechanisms=OAUTHBEARER listeners=CLIENT://0.0.0.0:9092 # ... listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.valid.issuer.uri="https://AUTH-SERVER-ADDRESS" \ oauth.jwks.endpoint.uri="https://AUTH-SERVER-ADDRESS/jwks" \ oauth.username.claim="preferred_username" \ oauth.client.id="kafka-broker" \ oauth.client.secret="kafka-secret" \ oauth.token.endpoint.uri="https://AUTH-SERVER-ADDRESS/token" ; listener.name.client.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler listener.name.client.oauthbearer.connections.max.reauth.ms=3600000
See: Session re-authentication for Kafka brokers and Configuring OAuth 2.0 support for Kafka brokers.
JWKS keys refresh interval
When configuring Kafka brokers to use fast local JWT token validation, you can now set the oauth.jwks.refresh.min.pause.seconds
option in the listener configuration (in the server.properties
file). This defines the minimum interval between attempts by the broker to refresh JSON Web Key Set (JWKS) public keys issued by the authorization server.
With this release, if the Kafka broker detects an unknown signing key, it attempts to refresh JWKS keys immediately and ignores the regular refresh schedule.
Example configuration for a 2-minute pause between attempts to refresh JWKS keys
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
oauth.valid.issuer.uri="https://AUTH-SERVER-ADDRESS" \
oauth.jwks.endpoint.uri="https://AUTH-SERVER-ADDRESS/jwks" \
oauth.jwks.refresh.seconds="300" \
oauth.jwks.refresh.min.pause.seconds="120" \
# ...
oauth.ssl.truststore.type="PKCS12" ;
The refresh schedule for JWKS keys is set in the oauth.jwks.refresh.seconds
option. When an unknown signing key is encountered, a JWKS keys refresh is scheduled outside of the refresh schedule. The refresh will not start until the time since the last refresh reaches the interval specified in oauth.jwks.refresh.min.pause.seconds
. The default value is 1
.
See Configuring OAuth 2.0 support for Kafka brokers.
Refreshing grants from Red Hat Single Sign-On
New configuration options have been added for OAuth 2.0 token-based authorization through Red Hat Single Sign-On. When configuring Kafka brokers, you can now define the following options related to refreshing grants from Red Hat SSO Authorization Services:
-
strimzi.authorization.grants.refresh.period.seconds
: The time between two consecutive grants refresh runs. The default value is60
. If set to0
or less, refreshing of grants is disabled. -
strimzi.authorization.grants.refresh.pool.size
: The number of threads that can fetch grants for the active session in parallel. The default value is5
.
See Using OAuth 2.0 token-based authorization and Configuring OAuth 2.0 authorization support
Detection of permission changes in Red Hat Single Sign-On
With this release, the KeycloakRBACAuthorizer
(Red Hat SSO) authorization regularly checks for changes in permissions for the active sessions. Central user and permissions management changes are now detected in real time.
2.5. Deprecation of ZooKeeper option in Kafka administrative tools
The --zookeeper
option was deprecated in the following Kafka administrative tools:
-
bin/kafka-configs.sh
-
bin/kafka-leader-election.sh
-
bin/kafka-topics.sh
When using these tools, you should now use the --bootstrap-server
option to specify the Kafka broker to connect to. For example:
/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
Although the --zookeeper
option still works, it will be removed from all the administrative tools in a future Kafka release. This is part of ongoing work in the Apache Kafka project to remove Kafka’s dependency on ZooKeeper.
The Using AMQ Streams on RHEL guide has been updated to use the --bootstrap-server
option in several procedures.