7.2. 为 OpenShift 外部的客户端设置访问权限
此流程演示了如何从外部 OpenShift 配置对 Kafka 集群的客户端访问。
使用 Kafka 集群的地址,您可以提供对不同 OpenShift 命名空间或 OpenShift 之外的客户端的外部访问权限。
您可以配置外部 Kafka 侦听器来提供访问。
支持以下外部监听程序类型:
-
使用 OpenShift
Route和默认 HAProxy 路由器的路由 -
LoadBalancer使用负载均衡器服务 -
NodePort使用 OpenShift 节点上的端口 -
Ingress使用 OpenShift Ingress 和 NGINX Ingress Controller for Kubernetes
所选的类型取决于您的要求,以及您的环境和基础架构。例如,负载均衡器可能不适用于某些基础架构,如裸机,节点端口提供了更好的选项。
在这个流程中:
- 为 Kafka 集群配置外部监听程序,启用 TLS 加密和身份验证,并且 Kafka 简单授权。
-
为客户端创建
KafkaUser,其使用为 简单授权 定义的 TLS 身份验证和访问控制列表(ACL)创建。
您可以将您的侦听器配置为使用 TLS、SCRAM-SHA-512 或 OAuth 2.0 身份验证。TLS 始终使用加密,但建议使用 SCRAM-SHA-512 和 OAuth 2.0 身份验证的加密。
您可以为 Kafka 代理配置简单的 OAuth 2.0、OPA 或自定义授权。启用后,授权将应用到所有启用的监听程序。
当您配置 KafkaUser 身份验证和授权机制时,请确保它们与对等的 Kafka 配置匹配:
-
KafkaUser.spec.authentication匹配Kafka.spec.kafka.listeners[*].authentication -
KafkaUser.spec.authorization匹配Kafka.spec.kafka.authorization
您应该至少有一个监听程序支持您要用于 KafkaUser 的身份验证。
Kafka 用户和 Kafka 代理之间的身份验证取决于每个验证设置。例如,如果 Kafka 配置中还没有启用,则无法使用 TLS 验证用户。
AMQ Streams operator 会自动完成配置过程:
- Cluster Operator 创建监听程序并设置集群和客户端证书颁发机构(CA)证书,以便在 Kafka 集群中启用身份验证。
- User Operator 根据所选的身份验证类型创建代表客户端和用于客户端身份验证的安全凭证的用户。
在这一流程中,会使用 Cluster Operator 生成的证书,但您可以 通过安装自己的证书 来替换它们。您还可以将监听程序 配置为使用由外部证书颁发机构管理的 Kafka 侦听器证书。
证书包括在 PKCS #12 (.p12)和 PEM (.crt)格式。这个过程显示 PKCS #12 证书。
先决条件
- Kafka 集群可用于客户端
- Cluster Operator 和 User Operator 在集群中运行
- OpenShift 集群外的客户端连接到 Kafka 集群
流程
使用外部Kafka 侦听器配置 Kafka 集群。- 定义通过监听程序访问 Kafka 代理所需的身份验证
在 Kafka 代理中启用授权
例如:
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster namespace: myproject spec: kafka: # ... listeners:1 - name: external2 port: 90943 type: LISTENER-TYPE4 tls: true5 authentication: type: tls6 configuration: preferredNodePortAddressType: InternalDNS7 bootstrap and broker service overrides8 #... authorization:9 type: simple superUsers: - super-user-name10 # ...- 1
- 有关启用外部监听器的配置选项,请参考 Generic Kafka 侦听器 schema。
- 2
- 用于标识监听程序的名称。必须在 Kafka 集群中唯一。
- 3
- Kafka 内监听器使用的端口号。端口号必须在给定的 Kafka 集群中唯一。允许的端口号为 9092 及更高版本,除了端口 9404 和 9999 除外,后者已用于 Prometheus 和 JMX。根据监听程序类型,端口号可能与连接 Kafka 客户端的端口号不同。
- 4
- 将外部监听程序类型指定为
路由、loadbalancer、nodeport或ingress。内部侦听器指定为内部。 - 5
- 在监听器上启用 TLS 加密。默认为
false。路由监听器不需要 TLS 加密。 - 6
- 指定为
tls的身份验证。 - 7
- (可选,仅适用于
nodeport监听程序)配置 ,为 AMQ Streams 使用的第一个地址类型指定为节点地址。 - 8
- (可选)AMQ Streams 会自动决定向客户端公告的地址。地址由 OpenShift 自动分配。如果您正在运行 AMQ Streams 的基础架构没有提供正确的地址,则可以覆盖 bootstrap 和代理服务 地址。验证在覆盖中不会执行。覆盖配置根据监听器类型而有所不同。例如,您可以覆盖
路由、DNS 名称或 IP 地址的主机以用于负载均衡器,以及节点端口的节点端口。 - 9
- 授权指定为
simple,它使用AclAuthorizerKafka 插件。 - 10
- (可选)Super 用户可以访问所有代理,无论 ACL 中定义的任何访问限制如何。
警告OpenShift Route 地址由 Kafka 集群的名称、侦听器的名称以及在其中创建的命名空间的名称组成。例如,
my-cluster-kafka-listener1-bootstrap-myproject(CLUSTER-NAME-kafka-LISTENER-NAME-bootstrap-NAMESPACE)。如果您使用路由监听程序类型,请注意地址的整个长度不超过 63 个字符的最大值。
创建或更新
Kafka资源。oc apply -f <kafka_configuration_file>Kafka 集群使用 TLS 身份验证配置 Kafka 代理监听程序。
为每个 Kafka 代理 pod 创建服务。
创建服务作为连接到 Kafka 集群的 bootstrap 地址。
一个服务也作为外部 bootstrap 地址为到 Kafka 集群的外部连接创建,使用
nodeport监听器。在 secret <cluster
_name> -cluster-ca-cert 中也创建了用于验证 kafka 代理身份的集群CA 证书。注意如果您在使用外部监听程序时扩展 Kafka 集群,可能会触发所有 Kafka 代理的滚动更新。这取决于配置。
从
Kafka资源的状态查找 bootstrap 地址和端口。oc get kafka KAFKA-CLUSTER-NAME -o jsonpath='{.status.listeners[?(@.name=="external")].bootstrapServers}'使用 Kafka 客户端中的 bootstrap 地址连接到 Kafka 集群。
创建或修改代表需要访问 Kafka 集群的客户端的用户。
-
指定与
Kafka监听器相同的验证类型。 指定用于简单授权的授权 ACL。
例如:
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaUser metadata: name: my-user labels: strimzi.io/cluster: my-cluster1 spec: authentication: type: tls2 authorization: type: simple acls:3 - resource: type: topic name: my-topic patternType: literal operation: Read - resource: type: topic name: my-topic patternType: literal operation: Describe - resource: type: group name: my-group patternType: literal operation: Read
-
指定与
创建或修改
KafkaUser资源。oc apply -f USER-CONFIG-FILE创建用户,以及与
KafkaUser资源名称相同的 Secret。Secret 包含 TLS 客户端身份验证的私钥和公钥。例如:
apiVersion: v1 kind: Secret metadata: name: my-user labels: strimzi.io/kind: KafkaUser strimzi.io/cluster: my-cluster type: Opaque data: ca.crt: PUBLIC-KEY-OF-THE-CLIENT-CA user.crt: USER-CERTIFICATE-CONTAINING-PUBLIC-KEY-OF-USER user.key: PRIVATE-KEY-OF-USER user.p12: P12-ARCHIVE-FILE-STORING-CERTIFICATES-AND-KEYS user.password: PASSWORD-PROTECTING-P12-ARCHIVE将公共集群 CA 证书提取到所需证书格式:
oc get secret KAFKA-CLUSTER-NAME-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12从密码文件中提取密码:
oc get secret KAFKA-CLUSTER-NAME-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d > ca.password使用公共集群证书的身份验证详情配置客户端:
客户端代码示例
properties.put("security.protocol","SSL");1 properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,"/path/to/ca.p12");2 properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,CA-PASSWORD);3 properties.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,"PKCS12");4 注意在通过 TLS 使用 SCRAM-SHA 验证时,使用
security.protocol: SASL_SSL。将用户 CA 证书从用户 Secret 提取到所需证书格式:
oc get secret USER-NAME -o jsonpath='{.data.user\.p12}' | base64 -d > user.p12从密码文件中提取密码:
oc get secret USER-NAME -o jsonpath='{.data.user\.password}' | base64 -d > user.password使用用户 CA 证书的验证详情配置您的客户端:
客户端代码示例
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,"/path/to/user.p12");1 properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,"<user.password>");2 properties.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG,"PKCS12");3 添加用于连接 Kafka 集群的 bootstrap 地址和端口:
bootstrap.servers: BOOTSTRAP-ADDRESS:PORT