此内容没有您所选择的语言版本。
Chapter 6. Setting up client access to the Kafka cluster
After you have deployed AMQ Streams, the procedures in this section explain how to:
- Deploy example producer and consumer clients, which you can use to verify your deployment
Set up external client access to the Kafka cluster
The steps to set up access to the Kafka cluster for a client outside OpenShift are more complex, and require familiarity with the Kafka component configuration procedures described in the Using AMQ Streams on OpenShift guide.
6.1. Deploying example clients
This procedure shows how to deploy example producer and consumer clients that use the Kafka cluster you created to send and receive messages.
Prerequisites
- The Kafka cluster is available for the clients.
Procedure
Deploy a Kafka producer.
oc run kafka-producer -ti --image=registry.redhat.io/amq7/amq-streams-kafka-27-rhel7:1.7.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list cluster-name-kafka-bootstrap:9092 --topic my-topic
- Type a message into the console where the producer is running.
- Press Enter to send the message.
Deploy a Kafka consumer.
oc run kafka-consumer -ti --image=registry.redhat.io/amq7/amq-streams-kafka-27-rhel7:1.7.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server cluster-name-kafka-bootstrap:9092 --topic my-topic --from-beginning
- Confirm that you see the incoming messages in the consumer console.
6.2. Setting up access for clients outside of OpenShift
This procedure shows how to configure client access to a Kafka cluster from outside OpenShift.
Using the address of the Kafka cluster, you can provide external access to a client on a different OpenShift namespace or outside OpenShift entirely.
You configure an external Kafka listener to provide the access.
The following external listener types are supported:
-
route
to use OpenShiftRoute
and the default HAProxy router -
loadbalancer
to use loadbalancer services -
nodeport
to use ports on OpenShift nodes -
ingress
to use OpenShift Ingress and the NGINX Ingress Controller for Kubernetes
The type chosen depends on your requirements, and your environment and infrastructure. For example, loadbalancers might not be suitable for certain infrastructure, such as bare metal, where node ports provide a better option.
In this procedure:
- An external listener is configured for the Kafka cluster, with TLS encryption and authentication, and Kafka simple authorization is enabled.
-
A
KafkaUser
is created for the client, with TLS authentication and Access Control Lists (ACLs) defined for simple authorization.
You can configure your listener to use TLS or SCRAM-SHA-512 authentication, both of which can be used with TLS encryption. If you are using an authorization server, you can use token-based OAuth 2.0 authentication and OAuth 2.0 authorization. Open Policy Agent (OPA) authorization is also supported as a Kafka authorization option.
When you configure the KafkaUser
authentication and authorization mechanisms, ensure they match the equivalent Kafka configuration:
-
KafkaUser.spec.authentication
matchesKafka.spec.kafka.listeners[*].authentication
-
KafkaUser.spec.authorization
matchesKafka.spec.kafka.authorization
You should have at least one listener supporting the authentication you want to use for the KafkaUser
.
Authentication between Kafka users and Kafka brokers depends on the authentication settings for each. For example, it is not possible to authenticate a user with TLS if it is not also enabled in the Kafka configuration.
AMQ Streams operators automate the configuration process:
- The Cluster Operator creates the listeners and sets up the cluster and client certificate authority (CA) certificates to enable authentication within the Kafka cluster.
- The User Operator creates the user representing the client and the security credentials used for client authentication, based on the chosen authentication type.
In this procedure, the certificates generated by the Cluster Operator are used, but you can replace them by installing your own certificates. You can also configure your listener to use a Kafka listener certificate managed by an external Certificate Authority.
Certificates are available in PKCS #12 format (.p12) and PEM (.crt) formats.
Prerequisites
- The Kafka cluster is available for the client
- The Cluster Operator and User Operator are running in the cluster
- A client outside the OpenShift cluster to connect to the Kafka cluster
Procedure
Configure the Kafka cluster with an
external
Kafka listener.- Define the authentication required to access the Kafka broker through the listener
Enable authorization on the Kafka broker
For example:
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster namespace: myproject spec: kafka: # ... listeners: 1 - name: external 2 port: 9094 3 type: LISTENER-TYPE 4 tls: true 5 authentication: type: tls 6 configuration: preferredNodePortAddressType: InternalDNS 7 bootstrap and broker service overrides 8 #... authorization: 9 type: simple superUsers: - super-user-name 10 # ...
- 1
- Configuration options for enabling external listeners are described in the Generic Kafka listener schema reference.
- 2
- Name to identify the listener. Must be unique within the Kafka cluster.
- 3
- Port number used by the listener inside Kafka. The port number has to be unique within a given Kafka cluster. Allowed port numbers are 9092 and higher with the exception of ports 9404 and 9999, which are already used for Prometheus and JMX. Depending on the listener type, the port number might not be the same as the port number that connects Kafka clients.
- 4
- External listener type specified as
route
,loadbalancer
,nodeport
oringress
. An internal listener is specified asinternal
. - 5
- Enables TLS encryption on the listener. Default is
false
. TLS encryption is not required forroute
listeners. - 6
- Authentication specified as
tls
. - 7
- (Optional, for
nodeport
listeners only) Configuration to specify a preference for the first address type used by AMQ Streams as the node address. - 8
- (Optional) AMQ Streams automatically determines the addresses to advertise to clients. The addresses are automatically assigned by OpenShift. You can override bootstrap and broker service addresses if the infrastructure on which you are running AMQ Streams does not provide the right address. Validation is not performed on the overrides. The override configuration differs according to the listener type. For example, you can override hosts for
route
, DNS names or IP addresses forloadbalancer
, and node ports fornodeport
. - 9
- Authoization specified as
simple
, which uses theAclAuthorizer
Kafka plugin. - 10
- (Optional) Super users can access all brokers regardless of any access restrictions defined in ACLs.
WarningAn OpenShift Route address comprises the name of the Kafka cluster, the name of the listener, and the name of the namespace it is created in. For example,
my-cluster-kafka-listener1-bootstrap-myproject
(CLUSTER-NAME-kafka-LISTENER-NAME-bootstrap-NAMESPACE). If you are using aroute
listener type, be careful that the whole length of the address does not exceed a maximum limit of 63 characters.
Create or update the
Kafka
resource.oc apply -f KAFKA-CONFIG-FILE
The Kafka cluster is configured with a Kafka broker listener using TLS authentication.
A service is created for each Kafka broker pod.
A service is created to serve as the bootstrap address for connection to the Kafka cluster.
A service is also created as the external bootstrap address for external connection to the Kafka cluster using
nodeport
listeners.The cluster CA certificate to verify the identity of the kafka brokers is also created with the same name as the
Kafka
resource.Find the bootstrap address and port from the status of the
Kafka
resource.oc get kafka KAFKA-CLUSTER-NAME -o jsonpath='{.status.listeners[?(@.type=="external")].bootstrapServers}'
Use the bootstrap address in your Kafka client to connect to the Kafka cluster.
Extract the public cluster CA certificate and password from the generated
KAFKA-CLUSTER-NAME-cluster-ca-cert
Secret.oc get secret KAFKA-CLUSTER-NAME-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12
oc get secret KAFKA-CLUSTER-NAME-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d > ca.password
Use the certificate and password in your Kafka client to connect to the Kafka cluster with TLS encryption.
NoteCluster CA certificates renew automatically by default. If you are using your own Kafka listener certificates, you will need to renew the certificates manually.
Create or modify a user representing the client that requires access to the Kafka cluster.
-
Specify the same authentication type as the
Kafka
listener. Specify the authorization ACLs for simple authorization.
For example:
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaUser metadata: name: my-user labels: strimzi.io/cluster: my-cluster 1 spec: authentication: type: tls 2 authorization: type: simple acls: 3 - resource: type: topic name: my-topic patternType: literal operation: Read - resource: type: topic name: my-topic patternType: literal operation: Describe - resource: type: group name: my-group patternType: literal operation: Read
-
Specify the same authentication type as the
Create or modify the
KafkaUser
resource.oc apply -f USER-CONFIG-FILE
The user is created, as well as a Secret with the same name as the
KafkaUser
resource. The Secret contains a private and public key for TLS client authentication.For example:
apiVersion: v1 kind: Secret metadata: name: my-user labels: strimzi.io/kind: KafkaUser strimzi.io/cluster: my-cluster type: Opaque data: ca.crt: PUBLIC-KEY-OF-THE-CLIENT-CA user.crt: USER-CERTIFICATE-CONTAINING-PUBLIC-KEY-OF-USER user.key: PRIVATE-KEY-OF-USER user.p12: P12-ARCHIVE-FILE-STORING-CERTIFICATES-AND-KEYS user.password: PASSWORD-PROTECTING-P12-ARCHIVE
Configure your client to connect to the Kafka cluster with the properties required to make a secure connection to the Kafka cluster.
Add the authentication details for the public cluster certificates:
security.protocol: SSL 1 ssl.truststore.location: PATH-TO/ssl/keys/truststore 2 ssl.truststore.password: CLUSTER-CA-CERT-PASSWORD 3 ssl.truststore.type=PKCS12 4
- 1
- Enables TLS encryption (with or without TLS client authentication).
- 2
- Specifies the truststore location where the certificates were imported.
- 3
- Specifies the password for accessing the truststore. This property can be omitted if it is not needed by the truststore.
- 4
- Identifies the truststore type.
NoteUse
security.protocol: SASL_SSL
when using SCRAM-SHA authentication over TLS.Add the bootstrap address and port for connecting to the Kafka cluster:
bootstrap.servers: BOOTSTRAP-ADDRESS:PORT
Add the authentication details for the public user certificates:
ssl.keystore.location: PATH-TO/ssl/keys/user1.keystore 1 ssl.keystore.password: USER-CERT-PASSWORD 2
The public user certificate is signed by the client CA when it is created.