第 3 章 使用 Record Encryption 过滤器部署 Apache Kafka 代理的流
Apache Kafka 代理的流旨在与由 Streams for Apache Kafka 管理的 Kafka 集群无缝集成。另外,它还提供与所有类型的 Kafka 实例的兼容性,无论它们的发布或协议版本是什么。无论您的部署涉及公共云或私有云,还是要设置本地开发环境,本指南中的说明适用于所有情况。
在此过程中,Apache Kafka Proxy 的 Streams 使用 Record Encryption 过滤器部署,以用于 OpenShift 中由 Apache Kafka 管理的 Kafka 实例。
Apache Kafka 的 Streams 提供了示例安装工件,其中包含 Apache Kafka Proxy 的 Streams 所需的配置,以连接到 examples/proxy/record-encryption
文件夹中的 Kafka 集群。
使用示例配置文件,使用以下监听程序类型部署和公开代理:
-
使用每个代理的
ClusterIP
服务的cluster-ip
类型监听程序在 OpenShift 集群中公开代理 -
使用 per-broker
loadbalancer
服务在
对于每个选项,提供了以下文件:
-
kustomization.yaml
指定用于部署代理的 Kubernetes 自定义 -
proxy-config.yaml
指定代理的ConfigMap
资源配置 -
proxy-service.yaml
指定代理服务的Service
资源配置
ConfigMap
资源提供设置虚拟集群和过滤器的配置。虚拟主机代表您要与代理一起使用的 Kafka 集群。
先决条件
- 运行受支持的版本的 OpenShift 集群。
- 由 Streams for Apache Kafka 管理的 Kafka 集群在 OpenShift 集群中运行的。
- 本地安装的 Kafka 二进制文件,以验证通过 loadbalancer 进行外部访问的代理设置。Kafka 二进制文件包含在 RHEL for Apache Kafka 软件下载页面 的 Streams for Apache Kafka 的安装工件 中。
- 包含用于创建虚拟设备和过滤器的配置的配置映射。
-
oc
命令行工具已安装并配置为连接到具有 admin 访问权限的 OpenShift 集群。 -
helm
命令行工具已安装并配置为连接到具有 admin 访问权限的 OpenShift 集群。 HashiCorp Vault 为代理设置,可从 Streams for Apache Kafka Proxy 访问。
确保为 Record Encryption 过滤器设置了 Vault 实例。
有关此流程中使用的 oc
和 helm
命令行选项的信息,请查看 --help
。
除了为 Apache Kafka 代理安装流的文件外,Apache Kafka Proxy 的 Streams 还提供预配置的文件来安装 Kafka 集群。安装文件提供了设置和尝试代理的最快速方法,但您可以使用自己的由 Streams for Apache Kafka 和 Vault 管理的 Kafka 集群部署。
在此过程中,我们连接到名为 my-cluster
的 Kafka 集群,部署到 kafka
命名空间。要将代理部署到与 Apache Kafka 中由 Streams 管理的集群相同的命名空间,请更改 kustomization.yaml
文件中的 namespace
设置。代理默认部署到 代理
命名空间中。如果保留此设置,则必须在集群范围内安装 Apache Kafka Operator 的 Streams。
流程
下载并提取 Apache Kafka 代理安装工件的 Streams。
工件包含在安装以及 Apache Kafka 软件下载页面的 Streams 中提供的示例文件。
文件包含通过
cluster-ip
或loadbalancer
类型监听程序连接所需的部署配置。在 Kafka 集群中创建主题:
oc run -n <my_proxy_namespace> -ti proxy-producer \ --image=registry.redhat.io/amq-streams/kafka-37-rhel9:2.7.0 \ --rm=true \ --restart=Never \ -- bin/kafka-topics.sh \ --bootstrap-server proxy-service:9092 \ --create -topic my-topic
在本例中,我们通过交互式 pod 容器创建一个名为
my-topic
的主题。在 Vault 中为
my-topic
创建密钥:vault write -f transit/keys/KEK_my-topic
编辑为代理提供过滤器配置的
ConfigMap
。Record Encryption
过滤器
配置需要 HashiCorp Vault KMS 的凭证。记录加密过滤器配置示例
filters: - type: RecordEncryption config: kms: VaultKmsService 1 kmsConfig: vaultTransitEngineUrl: http://vault.vault.svc.cluster.local:8200/v1/transit 2 vaultToken: passwordFile: /opt/proxy/encryption/token.txt 3 selector: TemplateKekSelector 4 selectorConfig: template: "KEK_${topicName}" 5 # ...
使用 Record Encryption 过滤器和适当的监听程序为 OpenShift 集群部署 Apache Kafka 代理的流:
使用
cluster-ip
侦听器部署代理cd /examples/proxy/record-encryption/ oc apply -k cluster-ip
使用
loadbalancer
侦听器部署代理cd /examples/proxy/record-encryption/ oc apply -k loadbalancer
如果您使用
loadbalancer
侦听器,请更新代理配置以使用所创建的 loadbalancer 服务地址。获取代理服务的外部地址:
LOAD_BALANCER_ADDRESS=$(oc get service -n <my_proxy_namespace> proxy-service --template='{{(index .status.loadBalancer.ingress 0).hostname}}')
更新代理服务配置中的
brokerAddressPattern
属性以使用代理地址:sed -i "s/\(brokerAddressPattern:\).*$/\1 ${LOAD_BALANCER_ADDRESS}/" load-balancer/proxy/proxy-config.yaml
将更改应用到代理配置并重启代理 pod。
oc apply -k load-balancer && oc delete pod -n <my_proxy_namespace> --all
检查部署的状态:
oc get pods -n <my_proxy_namespace>
输出显示部署名称和就绪
NAME READY STATUS RESTARTS my-cluster-kafka-0 1/1 Running 0 my-cluster-kafka-1 1/1 Running 0 my-cluster-kafka-2 1/1 Running 0 my-cluster-proxy-<pod_id> 1/1 Running 0
my-cluster-proxy
是代理的名称。标识创建的 pod 的 pod ID。
使用默认部署,您可以安装单个代理 pod。
READY 显示就绪/预期的副本数。当 STATUS 显示为 Running 时,部署成功。
- 通过代理生成消息,然后直接从 Kafka 集群使用和间接使用,验证加密是否已应用到指定的主题。
3.1. 在使用 cluster-ip
类型监听程序时验证代理
在使用 cluster-ip
类型监听程序时,为 OpenShift 集群中的 Kafka producer 和消费者运行交互式 pod 容器,以验证代理是否正常工作。
从代理生成消息:
通过代理生成消息
oc run -n <my_proxy_namespace> -ti proxy-producer \ --image=registry.redhat.io/amq-streams/kafka-37-rhel9:2.7.0 \ --rm=true \ --restart=Never \ -- bin/kafka-console-producer.sh \ --bootstrap-server proxy-service:9092 \ --topic my-topic
直接从 Kafka 集群使用消息来显示它们已被加密:
直接从 Kafka 集群使用消息
oc run -n my-cluster -ti cluster-consumer \ --image=registry.redhat.io/amq-streams/kafka-37-rhel9:2.7.0 \ --rm=true \ --restart=Never -- ./bin/kafka-console-consumer.sh \ --bootstrap-server my-cluster-kafka-bootstrap:9092 \ --topic my-topic \ --from-beginning \ --timeout-ms 10000
使用代理的消息来自动解密它们:
直接从 Kafka 集群使用消息
oc run -n <my_proxy_namespace> -ti proxy-consumer \ --image=registry.redhat.io/amq-streams/kafka-37-rhel9:2.7.0 \ --rm=true \ --restart=Never \ -- ./bin/kafka-console-consumer.sh \ --bootstrap-server proxy-service:9092 \ --topic my-topic --from-beginning --timeout-ms 10000