搜索

第 3 章 使用 Record Encryption 过滤器部署 Apache Kafka 代理的流

download PDF

Apache Kafka 代理的流旨在与由 Streams for Apache Kafka 管理的 Kafka 集群无缝集成。另外,它还提供与所有类型的 Kafka 实例的兼容性,无论它们的发布或协议版本是什么。无论您的部署涉及公共云或私有云,还是要设置本地开发环境,本指南中的说明适用于所有情况。

在此过程中,Apache Kafka Proxy 的 Streams 使用 Record Encryption 过滤器部署,以用于 OpenShift 中由 Apache Kafka 管理的 Kafka 实例。

Apache Kafka 的 Streams 提供了示例安装工件,其中包含 Apache Kafka Proxy 的 Streams 所需的配置,以连接到 examples/proxy/record-encryption 文件夹中的 Kafka 集群。

使用示例配置文件,使用以下监听程序类型部署和公开代理:

  • 使用每个代理的 ClusterIP 服务的 cluster-ip 类型监听程序在 OpenShift 集群中公开代理
  • 使用 per-broker loadbalancer 服务在 OpenShift 集群外公开代理的 LoadBalancer 类型监听程序

对于每个选项,提供了以下文件:

  • kustomization.yaml 指定用于部署代理的 Kubernetes 自定义
  • proxy-config.yaml 指定代理的 ConfigMap 资源配置
  • proxy-service.yaml 指定代理服务的 Service 资源配置

ConfigMap 资源提供设置虚拟集群和过滤器的配置。虚拟主机代表您要与代理一起使用的 Kafka 集群。

先决条件

  • 运行受支持的版本的 OpenShift 集群。
  • 由 Streams for Apache Kafka 管理的 Kafka 集群在 OpenShift 集群中运行的。
  • 本地安装的 Kafka 二进制文件,以验证通过 loadbalancer 进行外部访问的代理设置。Kafka 二进制文件包含在 RHEL for Apache Kafka 软件下载页面 的 Streams for Apache Kafka 的安装工件 中。
  • 包含用于创建虚拟设备和过滤器的配置的配置映射。
  • oc 命令行工具已安装并配置为连接到具有 admin 访问权限的 OpenShift 集群。
  • helm 命令行工具已安装并配置为连接到具有 admin 访问权限的 OpenShift 集群。
  • HashiCorp Vault 为代理设置,可从 Streams for Apache Kafka Proxy 访问。

    确保为 Record Encryption 过滤器设置了 Vault 实例。

有关此流程中使用的 ochelm 命令行选项的信息,请查看 --help

除了为 Apache Kafka 代理安装流的文件外,Apache Kafka Proxy 的 Streams 还提供预配置的文件来安装 Kafka 集群。安装文件提供了设置和尝试代理的最快速方法,但您可以使用自己的由 Streams for Apache Kafka 和 Vault 管理的 Kafka 集群部署。

在此过程中,我们连接到名为 my-cluster 的 Kafka 集群,部署到 kafka 命名空间。要将代理部署到与 Apache Kafka 中由 Streams 管理的集群相同的命名空间,请更改 kustomization.yaml 文件中的 namespace 设置。代理默认部署到 代理 命名空间中。如果保留此设置,则必须在集群范围内安装 Apache Kafka Operator 的 Streams。

流程

  1. 下载并提取 Apache Kafka 代理安装工件的 Streams。

    工件包含在安装以及 Apache Kafka 软件下载页面的 Streams 中提供的示例文件。

    文件包含通过 cluster-iploadbalancer 类型监听程序连接所需的部署配置。

  2. 在 Kafka 集群中创建主题:

    oc run -n <my_proxy_namespace> -ti proxy-producer \
      --image=registry.redhat.io/amq-streams/kafka-37-rhel9:2.7.0 \
      --rm=true \
      --restart=Never \
      -- bin/kafka-topics.sh \
      --bootstrap-server proxy-service:9092 \
      --create -topic my-topic

    在本例中,我们通过交互式 pod 容器创建一个名为 my-topic 的主题。

  3. 在 Vault 中为 my-topic 创建密钥:

    vault write -f transit/keys/KEK_my-topic
  4. 编辑为代理提供过滤器配置的 ConfigMap

    Record Encryption 过滤器 配置需要 HashiCorp Vault KMS 的凭证。

    记录加密过滤器配置示例

    filters:
      - type: RecordEncryption
        config:
          kms: VaultKmsService 1
          kmsConfig:
            vaultTransitEngineUrl: http://vault.vault.svc.cluster.local:8200/v1/transit 2
            vaultToken:
              passwordFile: /opt/proxy/encryption/token.txt 3
          selector: TemplateKekSelector 4
          selectorConfig:
            template: "KEK_${topicName}" 5
    	 # ...

    1
    使用的 KMS (密钥管理系统)的类型。在本例中,HashiCorp Vault。
    2
    Vault Transit Engine 服务的 URL。
    3
    包含访问 Vault 服务所需的令牌的文件。如果此位置发生变化,则代理部署配置中需要进行等同的更改。
    4
    要使用的密钥加密密钥(KEK)选择器。${topicName} 是代理理解的字面值。
    5
    基于特定主题名称派生的 KEK 模板。
  5. 使用 Record Encryption 过滤器和适当的监听程序为 OpenShift 集群部署 Apache Kafka 代理的流:

    使用 cluster-ip 侦听器部署代理

    cd /examples/proxy/record-encryption/
    oc apply -k cluster-ip

    使用 loadbalancer 侦听器部署代理

    cd /examples/proxy/record-encryption/
    oc apply -k loadbalancer

  6. 如果您使用 loadbalancer 侦听器,请更新代理配置以使用所创建的 loadbalancer 服务地址。

    1. 获取代理服务的外部地址:

      LOAD_BALANCER_ADDRESS=$(oc get service -n <my_proxy_namespace> proxy-service --template='{{(index .status.loadBalancer.ingress 0).hostname}}')
    2. 更新代理服务配置中的 brokerAddressPattern 属性以使用代理地址:

      sed -i "s/\(brokerAddressPattern:\).*$/\1 ${LOAD_BALANCER_ADDRESS}/" load-balancer/proxy/proxy-config.yaml
    3. 将更改应用到代理配置并重启代理 pod。

       oc apply -k load-balancer && oc delete pod -n <my_proxy_namespace> --all
  7. 检查部署的状态:

    oc get pods -n <my_proxy_namespace>

    输出显示部署名称和就绪

    NAME                      READY  STATUS   RESTARTS
    my-cluster-kafka-0        1/1    Running  0
    my-cluster-kafka-1        1/1    Running  0
    my-cluster-kafka-2        1/1    Running  0
    my-cluster-proxy-<pod_id> 1/1    Running  0

    my-cluster-proxy 是代理的名称。

    标识创建的 pod 的 pod ID。

    使用默认部署,您可以安装单个代理 pod。

    READY 显示就绪/预期的副本数。当 STATUS 显示为 Running 时,部署成功。

  8. 通过代理生成消息,然后直接从 Kafka 集群使用和间接使用,验证加密是否已应用到指定的主题。

3.1. 在使用 cluster-ip 类型监听程序时验证代理

在使用 cluster-ip 类型监听程序时,为 OpenShift 集群中的 Kafka producer 和消费者运行交互式 pod 容器,以验证代理是否正常工作。

  1. 从代理生成消息:

    通过代理生成消息

    oc run -n <my_proxy_namespace> -ti proxy-producer \
      --image=registry.redhat.io/amq-streams/kafka-37-rhel9:2.7.0 \
      --rm=true \
      --restart=Never \
      -- bin/kafka-console-producer.sh \
      --bootstrap-server proxy-service:9092 \
      --topic my-topic

  2. 直接从 Kafka 集群使用消息来显示它们已被加密:

    直接从 Kafka 集群使用消息

    oc run -n my-cluster -ti cluster-consumer \
      --image=registry.redhat.io/amq-streams/kafka-37-rhel9:2.7.0 \
      --rm=true \
      --restart=Never
      -- ./bin/kafka-console-consumer.sh  \
      --bootstrap-server my-cluster-kafka-bootstrap:9092 \
      --topic my-topic \
      --from-beginning \
      --timeout-ms 10000

  3. 使用代理的消息来自动解密它们:

    直接从 Kafka 集群使用消息

    oc run -n <my_proxy_namespace> -ti proxy-consumer \
      --image=registry.redhat.io/amq-streams/kafka-37-rhel9:2.7.0 \
      --rm=true \
      --restart=Never \
      -- ./bin/kafka-console-consumer.sh \
      --bootstrap-server proxy-service:9092 \
      --topic my-topic --from-beginning --timeout-ms 10000

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.