12.2. 生成重新分配 JSON 文件来重新分配分区


使用 kafka-reassign-partitions.sh 工具生成重新分配 JSON 文件,以便在扩展 Kafka 集群后重新分配分区。添加或删除代理不会自动重新分发现有分区。要平衡分区分发并充分利用新代理,您可以使用 kafka-reassign-partitions.sh 工具重新分配分区。

您可以从连接到 Kafka 集群的交互式 pod 容器运行该工具。

以下流程描述了使用 mTLS 的安全重新分配过程。您需要一个使用 TLS 加密和 mTLS 身份验证的 Kafka 集群。

您需要以下内容来建立连接:

  • 当 Kafka 集群时,Cluster Operator 生成的集群 CA 证书和密钥
  • 当用户为客户端访问 Kafka 集群的用户创建时,User Operator 生成的用户 CA 证书和密钥

在此过程中,CA 证书和对应的密码会从集群和包含它们的用户 secret 中提取,这些 secret 以 PKCS #12 (.p12.password) 的格式提取。密码允许访问包含证书的 .p12 存储。您可以使用 .p12 存储来指定信任存储和密钥存储来验证与 Kafka 集群的连接。

先决条件

  • 您有一个正在运行的 Cluster Operator。
  • 您有一个基于配置了内部 TLS 加密和 mTLS 身份验证的 Kafka 资源运行 Kafka 集群。

    使用 TLS 加密和 mTLS 身份验证配置 Kafka

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
        listeners:
          # ...
          - name: tls
            port: 9093
            type: internal
            tls: true 1
            authentication:
              type: tls 2
        # ...

    1
    为内部监听程序启用 TLS 加密。
    2
    侦听器身份验证机制指定为 mutual tls
  • 正在运行的 Kafka 集群包含一组要重新分配的主题和分区。

    my-topic的主题配置示例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaTopic
    metadata:
      name: my-topic
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      partitions: 10
      replicas: 3
      config:
        retention.ms: 7200000
        segment.bytes: 1073741824
        # ...

  • 有一个配置了 ACL 规则的 KafkaUser,它用于指定从 Kafka 代理生成和使用主题的权限。

    带有 ACL 规则的 Kafka 用户配置示例,允许对 my-topicmy-cluster执行操作

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaUser
    metadata:
      name: my-user
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      authentication: 1
        type: tls
      authorization:
        type: simple 2
        acls:
          # access to the topic
          - resource:
              type: topic
              name: my-topic
            operations:
              - Create
              - Describe
              - Read
              - AlterConfigs
            host: "*"
          # access to the cluster
          - resource:
              type: cluster
            operations:
              - Alter
              - AlterConfigs
            host: "*"
          # ...
      # ...

    1
    定义为 mutual tls 的用户身份验证机制。
    2
    简单的 ACL 规则授权和附带列表。

流程

  1. 从 Kafka 集群的 < cluster_name> -cluster-ca-cert secret 中提取集群 CA 证书和密钥。

    oc get secret <cluster_name>-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12
    oc get secret <cluster_name>-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d > ca.password

    <cluster_name > 替换为 Kafka 集群的名称。当使用 Kafka 资源部署 Kafka 时,会使用 Kafka 集群名称(<cluster_name> -cluster-ca-cert)创建带有集群CA 证书的 secret。例如,my-cluster-cluster-ca-cert

  2. 使用 AMQ Streams Kafka 镜像运行新的交互式 pod 容器,以连接到正在运行的 Kafka 代理。

    oc run --restart=Never --image=registry.redhat.io/amq-streams/kafka-34-rhel8:2.4.0 <interactive_pod_name> -- /bin/sh -c "sleep 3600"

    <interactive_pod_name > 替换为 pod 的名称。

  3. 将集群 CA 证书复制到交互式 pod 容器。

    oc cp ca.p12 <interactive_pod_name>:/tmp
  4. 从具有访问 Kafka 代理权限的 Kafka 用户的 secret 中提取用户 CA 用户和密码。

    oc get secret <kafka_user> -o jsonpath='{.data.user\.p12}' | base64 -d > user.p12
    oc get secret <kafka_user> -o jsonpath='{.data.user\.password}' | base64 -d > user.password

    <kafka_user > 替换为 Kafka 用户的名称。当使用 KafkaUser 资源创建 Kafka 用户时,会使用 Kafka 用户名创建带有用户 CA 证书的 secret。例如,my-user

  5. 将用户 CA 证书复制到交互式 pod 容器。

    oc cp user.p12 <interactive_pod_name>:/tmp

    CA 证书允许交互式 pod 容器使用 TLS 连接到 Kafka 代理。

  6. 创建 config.properties 文件,以指定用于验证与 Kafka 集群的连接的信任存储和密钥存储。

    使用您在上一步中提取的证书和密码。

    bootstrap.servers=<kafka_cluster_name>-kafka-bootstrap:9093 1
    security.protocol=SSL 2
    ssl.truststore.location=/tmp/ca.p12 3
    ssl.truststore.password=<truststore_password> 4
    ssl.keystore.location=/tmp/user.p12 5
    ssl.keystore.password=<keystore_password> 6
    1
    连接到 Kafka 集群的 bootstrap 服务器地址。使用您自己的 Kafka 集群名称替换 < kafka_cluster_name>
    2
    使用 TLS 加密时的安全协议选项。
    3
    truststore 位置包含了 Kafka 集群的公钥证书 (ca.p12)。
    4
    用于访问信任存储的密码(ca.password)。
    5
    密钥存储位置包含 Kafka 用户的公钥证书(user.p12)。
    6
    用于访问密钥存储的密码(user.password)。
  7. config.properties 文件复制到交互式 pod 容器。

    oc cp config.properties <interactive_pod_name>:/tmp/config.properties
  8. 准备名为 topics.json 的 JSON 文件,以指定要移动的主题。

    将主题名称指定为用逗号分开的列表。

    重新分配 my-topic的所有分区的 JSON 文件示例

    {
      "version": 1,
      "topics": [
        { "topic": "my-topic"}
      ]
    }

    您还可以使用此文件 更改主题的复制因素

  9. topics.json 文件复制到交互式 pod 容器。

    oc cp topics.json <interactive_pod_name>:/tmp/topics.json
  10. 在交互式 pod 容器中启动 shell 进程。

    oc exec -n <namespace> -ti <interactive_pod_name> /bin/bash

    <namespace > 替换为运行 pod 的 OpenShift 命名空间。

  11. 使用 kafka-reassign-partitions.sh 命令生成重新分配 JSON。

    my-topic 分区移到指定的代理的命令示例

    bin/kafka-reassign-partitions.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 \
      --command-config /tmp/config.properties \
      --topics-to-move-json-file /tmp/topics.json \
      --broker-list 0,1,2,3,4 \
      --generate

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.