이 콘텐츠는 선택한 언어로 제공되지 않습니다.
Chapter 6. Setting up client access to the Kafka cluster
After you have deployed AMQ Streams, the procedures in this section explain how to:
- Deploy example producer and consumer clients, which you can use to verify your deployment
Set up external client access to the Kafka cluster
The steps to set up access to the Kafka cluster for a client outside OpenShift are more complex, and require familiarity with the Kafka component configuration procedures described in the Using AMQ Streams on OpenShift guide.
6.1. Deploying example clients 링크 복사링크가 클립보드에 복사되었습니다!
This procedure shows how to deploy example producer and consumer clients that use the Kafka cluster you created to send and receive messages.
Prerequisites
- The Kafka cluster is available for the clients.
Procedure
Deploy a Kafka producer.
oc run kafka-producer -ti --image=registry.redhat.io/amq7/amq-streams-kafka-28-rhel8:1.8.4 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list cluster-name-kafka-bootstrap:9092 --topic my-topic
oc run kafka-producer -ti --image=registry.redhat.io/amq7/amq-streams-kafka-28-rhel8:1.8.4 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list cluster-name-kafka-bootstrap:9092 --topic my-topicCopy to Clipboard Copied! Toggle word wrap Toggle overflow - Type a message into the console where the producer is running.
- Press Enter to send the message.
Deploy a Kafka consumer.
oc run kafka-consumer -ti --image=registry.redhat.io/amq7/amq-streams-kafka-28-rhel8:1.8.4 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server cluster-name-kafka-bootstrap:9092 --topic my-topic --from-beginning
oc run kafka-consumer -ti --image=registry.redhat.io/amq7/amq-streams-kafka-28-rhel8:1.8.4 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server cluster-name-kafka-bootstrap:9092 --topic my-topic --from-beginningCopy to Clipboard Copied! Toggle word wrap Toggle overflow - Confirm that you see the incoming messages in the consumer console.
6.2. Setting up access for clients outside of OpenShift 링크 복사링크가 클립보드에 복사되었습니다!
This procedure shows how to configure client access to a Kafka cluster from outside OpenShift.
Using the address of the Kafka cluster, you can provide external access to a client on a different OpenShift namespace or outside OpenShift entirely.
You configure an external Kafka listener to provide the access.
The following external listener types are supported:
-
routeto use OpenShiftRouteand the default HAProxy router -
loadbalancerto use loadbalancer services -
nodeportto use ports on OpenShift nodes -
ingressto use OpenShift Ingress and the NGINX Ingress Controller for Kubernetes
The type chosen depends on your requirements, and your environment and infrastructure. For example, loadbalancers might not be suitable for certain infrastructure, such as bare metal, where node ports provide a better option.
In this procedure:
- An external listener is configured for the Kafka cluster, with TLS encryption and authentication, and Kafka simple authorization is enabled.
-
A
KafkaUseris created for the client, with TLS authentication and Access Control Lists (ACLs) defined for simple authorization.
You can configure your listener to use TLS, SCRAM-SHA-512 or OAuth 2.0 authentication. TLS always uses encryption, but it is recommended to also use encryption with SCRAM-SHA-512 and OAuth 2.0 authentication.
You can configure simple, OAuth 2.0, OPA or custom authorization for Kafka brokers. When enabled, authorization is applied to all enabled listeners.
When you configure the KafkaUser authentication and authorization mechanisms, ensure they match the equivalent Kafka configuration:
-
KafkaUser.spec.authenticationmatchesKafka.spec.kafka.listeners[*].authentication -
KafkaUser.spec.authorizationmatchesKafka.spec.kafka.authorization
You should have at least one listener supporting the authentication you want to use for the KafkaUser.
Authentication between Kafka users and Kafka brokers depends on the authentication settings for each. For example, it is not possible to authenticate a user with TLS if it is not also enabled in the Kafka configuration.
AMQ Streams operators automate the configuration process:
- The Cluster Operator creates the listeners and sets up the cluster and client certificate authority (CA) certificates to enable authentication within the Kafka cluster.
- The User Operator creates the user representing the client and the security credentials used for client authentication, based on the chosen authentication type.
In this procedure, the certificates generated by the Cluster Operator are used, but you can replace them by installing your own certificates. You can also configure your listener to use a Kafka listener certificate managed by an external Certificate Authority.
Certificates are available in PKCS #12 (.p12) and PEM (.crt) formats. This procedure shows PKCS #12 certificates.
Prerequisites
- The Kafka cluster is available for the client
- The Cluster Operator and User Operator are running in the cluster
- A client outside the OpenShift cluster to connect to the Kafka cluster
Procedure
Configure the Kafka cluster with an
externalKafka listener.- Define the authentication required to access the Kafka broker through the listener
Enable authorization on the Kafka broker
For example:
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - 1
- Configuration options for enabling external listeners are described in the Generic Kafka listener schema reference.
- 2
- Name to identify the listener. Must be unique within the Kafka cluster.
- 3
- Port number used by the listener inside Kafka. The port number has to be unique within a given Kafka cluster. Allowed port numbers are 9092 and higher with the exception of ports 9404 and 9999, which are already used for Prometheus and JMX. Depending on the listener type, the port number might not be the same as the port number that connects Kafka clients.
- 4
- External listener type specified as
route,loadbalancer,nodeportoringress. An internal listener is specified asinternal. - 5
- Enables TLS encryption on the listener. Default is
false. TLS encryption is not required forroutelisteners. - 6
- Authentication specified as
tls. - 7
- (Optional, for
nodeportlisteners only) Configuration to specify a preference for the first address type used by AMQ Streams as the node address. - 8
- (Optional) AMQ Streams automatically determines the addresses to advertise to clients. The addresses are automatically assigned by OpenShift. You can override bootstrap and broker service addresses if the infrastructure on which you are running AMQ Streams does not provide the right address. Validation is not performed on the overrides. The override configuration differs according to the listener type. For example, you can override hosts for
route, DNS names or IP addresses forloadbalancer, and node ports fornodeport. - 9
- Authorization specified as
simple, which uses theAclAuthorizerKafka plugin. - 10
- (Optional) Super users can access all brokers regardless of any access restrictions defined in ACLs.
WarningAn OpenShift Route address comprises the name of the Kafka cluster, the name of the listener, and the name of the namespace it is created in. For example,
my-cluster-kafka-listener1-bootstrap-myproject(CLUSTER-NAME-kafka-LISTENER-NAME-bootstrap-NAMESPACE). If you are using aroutelistener type, be careful that the whole length of the address does not exceed a maximum limit of 63 characters.
Create or update the
Kafkaresource.oc apply -f KAFKA-CONFIG-FILE
oc apply -f KAFKA-CONFIG-FILECopy to Clipboard Copied! Toggle word wrap Toggle overflow The Kafka cluster is configured with a Kafka broker listener using TLS authentication.
A service is created for each Kafka broker pod.
A service is created to serve as the bootstrap address for connection to the Kafka cluster.
A service is also created as the external bootstrap address for external connection to the Kafka cluster using
nodeportlisteners.The cluster CA certificate to verify the identity of the kafka brokers is also created with the same name as the
Kafkaresource.Find the bootstrap address and port from the status of the
Kafkaresource.oc get kafka KAFKA-CLUSTER-NAME -o jsonpath='{.status.listeners[?(@.type=="external")].bootstrapServers}'oc get kafka KAFKA-CLUSTER-NAME -o jsonpath='{.status.listeners[?(@.type=="external")].bootstrapServers}'Copy to Clipboard Copied! Toggle word wrap Toggle overflow Use the bootstrap address in your Kafka client to connect to the Kafka cluster.
Create or modify a user representing the client that requires access to the Kafka cluster.
-
Specify the same authentication type as the
Kafkalistener. Specify the authorization ACLs for simple authorization.
For example:
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
-
Specify the same authentication type as the
Create or modify the
KafkaUserresource.oc apply -f USER-CONFIG-FILE
oc apply -f USER-CONFIG-FILECopy to Clipboard Copied! Toggle word wrap Toggle overflow The user is created, as well as a Secret with the same name as the
KafkaUserresource. The Secret contains a private and public key for TLS client authentication.For example:
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Extract the public cluster CA certificate to the desired certificate format:
oc get secret KAFKA-CLUSTER-NAME-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12oc get secret KAFKA-CLUSTER-NAME-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12Copy to Clipboard Copied! Toggle word wrap Toggle overflow Extract the password from the password file:
oc get secret KAFKA-CLUSTER-NAME-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d > ca.passwordoc get secret KAFKA-CLUSTER-NAME-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d > ca.passwordCopy to Clipboard Copied! Toggle word wrap Toggle overflow Configure your client with the authentication details for the public cluster certificates:
Sample client code
properties.put("security.protocol","SSL"); properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,"/path/to/ca.p12"); properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,CA-PASSWORD); properties.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,"PKCS12");properties.put("security.protocol","SSL");1 properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,"/path/to/ca.p12");2 properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,CA-PASSWORD);3 properties.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,"PKCS12");4 Copy to Clipboard Copied! Toggle word wrap Toggle overflow - 1
- Enables TLS encryption (with or without TLS client authentication).
- 2
- Specifies the truststore location where the certificates were imported.
- 3
- Specifies the password for accessing the truststore. This property can be omitted if it is not needed by the truststore.
- 4
- Identifies the truststore type.
NoteUse
security.protocol: SASL_SSLwhen using SCRAM-SHA authentication over TLS.Extract the user CA certificate from the user Secret to the desired certificate format:
oc get secret USER-NAME -o jsonpath='{.data.user\.p12}' | base64 -d > user.p12oc get secret USER-NAME -o jsonpath='{.data.user\.p12}' | base64 -d > user.p12Copy to Clipboard Copied! Toggle word wrap Toggle overflow Extract the password from the password file:
oc get secret USER-NAME -o jsonpath='{.data.user\.password}' | base64 -d > user.passwordoc get secret USER-NAME -o jsonpath='{.data.user\.password}' | base64 -d > user.passwordCopy to Clipboard Copied! Toggle word wrap Toggle overflow Configure your client with the authentication details for the user CA certificate:
Sample client code
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,"/path/to/user.p12"); properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,"<user.password>"); properties.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG,"PKCS12");
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,"/path/to/user.p12");1 properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,"<user.password>");2 properties.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG,"PKCS12");3 Copy to Clipboard Copied! Toggle word wrap Toggle overflow Add the bootstrap address and port for connecting to the Kafka cluster:
bootstrap.servers: BOOTSTRAP-ADDRESS:PORT
bootstrap.servers: BOOTSTRAP-ADDRESS:PORTCopy to Clipboard Copied! Toggle word wrap Toggle overflow