19.2. 生成重新分配 JSON 文件来重新分配分区
使用 kafka-reassign-partitions.sh 工具生成一个重新分配 JSON 文件,以便在扩展 Kafka 集群后重新分配分区。添加或删除代理不会自动重新分发现有分区。要平衡分区分布并充分利用新代理,您可以使用 kafka-reassign-partitions.sh 工具重新分配分区。
您可以从连接到 Kafka 集群的交互式 pod 容器运行该工具。
以下流程描述了使用 mTLS 的安全重新分配过程。您需要一个使用 TLS 加密和 mTLS 身份验证的 Kafka 集群。
您需要以下内容来建立连接:
- 创建 Kafka 集群时由 Cluster Operator 生成的集群 CA 证书和私钥
- 当用户为对 Kafka 集群进行客户端访问时,User Operator 生成的用户 CA 证书和私钥
在此过程中,CA 证书和对应的密码会从集群和包含它们的用户 secret 中提取,这些 secret 以 PKCS #12 (.p12 和 .password) 的格式提取。密码允许访问包含证书的 .p12 存储。您可以使用 .p12 存储指定一个信任存储和密钥存储来验证与 Kafka 集群的连接。
先决条件
- 您有一个正在运行的 Cluster Operator。
您有一个正在运行的 Kafka 集群,它基于配置了内部 TLS 加密和 mTLS 身份验证的
Kafka资源。使用 TLS 加密和 mTLS 验证的 Kafka 配置
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 正在运行的 Kafka 集群包含一组要重新分配的主题和分区。
my-topic的主题配置示例Copy to Clipboard Copied! Toggle word wrap Toggle overflow 有一个配置了 ACL 规则的
KafkaUser,它用于指定从 Kafka 代理生成和使用主题的权限。使用 ACL 规则的 Kafka 用户配置示例,允许对
my-topic和my-cluster执行操作Copy to Clipboard Copied! Toggle word wrap Toggle overflow
流程
从 Kafka 集群的 <
cluster_name> -cluster-ca-certsecret 中提取集群 CA 证书和密钥。oc get secret <cluster_name>-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12oc get secret <cluster_name>-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12Copy to Clipboard Copied! Toggle word wrap Toggle overflow oc get secret <cluster_name>-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d > ca.passwordoc get secret <cluster_name>-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d > ca.passwordCopy to Clipboard Copied! Toggle word wrap Toggle overflow 将 <cluster_name > 替换为 Kafka 集群的名称。当使用 Kafka 资源部署 Kafka 时,使用
Kafka集群名称(<cluster_name> -cluster-ca-cert)创建带有集群CA 证书的 secret。例如,my-cluster-cluster-ca-cert。使用 AMQ Streams Kafka 镜像运行新的交互式 pod 容器,以连接到正在运行的 Kafka 代理。
oc run --restart=Never --image=registry.redhat.io/amq-streams/kafka-36-rhel8:2.6.0 <interactive_pod_name> -- /bin/sh -c "sleep 3600"
oc run --restart=Never --image=registry.redhat.io/amq-streams/kafka-36-rhel8:2.6.0 <interactive_pod_name> -- /bin/sh -c "sleep 3600"Copy to Clipboard Copied! Toggle word wrap Toggle overflow 将 <interactive_pod_name > 替换为 pod 的名称。
将集群 CA 证书复制到交互式 pod 容器。
oc cp ca.p12 <interactive_pod_name>:/tmp
oc cp ca.p12 <interactive_pod_name>:/tmpCopy to Clipboard Copied! Toggle word wrap Toggle overflow 从有权访问 Kafka 代理的 Kafka 用户的 secret 中提取用户 CA 证书和密钥。
oc get secret <kafka_user> -o jsonpath='{.data.user\.p12}' | base64 -d > user.p12oc get secret <kafka_user> -o jsonpath='{.data.user\.p12}' | base64 -d > user.p12Copy to Clipboard Copied! Toggle word wrap Toggle overflow oc get secret <kafka_user> -o jsonpath='{.data.user\.password}' | base64 -d > user.passwordoc get secret <kafka_user> -o jsonpath='{.data.user\.password}' | base64 -d > user.passwordCopy to Clipboard Copied! Toggle word wrap Toggle overflow 将 <kafka_user > 替换为 Kafka 用户的名称。当使用
KafkaUser资源创建 Kafka 用户时,会使用 Kafka 用户名创建带有用户 CA 证书的 secret。例如,my-user。将用户 CA 证书复制到交互式 pod 容器。
oc cp user.p12 <interactive_pod_name>:/tmp
oc cp user.p12 <interactive_pod_name>:/tmpCopy to Clipboard Copied! Toggle word wrap Toggle overflow CA 证书允许交互式 pod 容器使用 TLS 连接到 Kafka 代理。
创建
config.properties文件,以指定用于验证与 Kafka 集群连接的信任存储和密钥存储。使用您在前面的步骤中提取的证书和密码。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 将
config.properties文件复制到交互式 pod 容器中。oc cp config.properties <interactive_pod_name>:/tmp/config.properties
oc cp config.properties <interactive_pod_name>:/tmp/config.propertiesCopy to Clipboard Copied! Toggle word wrap Toggle overflow 准备名为
topics.json的 JSON 文件,以指定要移动的主题。将主题名称指定为用逗号分开的列表。
重新分配
my-topic的所有分区的 JSON 文件示例Copy to Clipboard Copied! Toggle word wrap Toggle overflow 您还可以使用此文件 更改主题 的复制因素。
将
topics.json文件复制到交互式 pod 容器中。oc cp topics.json <interactive_pod_name>:/tmp/topics.json
oc cp topics.json <interactive_pod_name>:/tmp/topics.jsonCopy to Clipboard Copied! Toggle word wrap Toggle overflow 在交互式 pod 容器中启动 shell 进程。
oc exec -n <namespace> -ti <interactive_pod_name> /bin/bash
oc exec -n <namespace> -ti <interactive_pod_name> /bin/bashCopy to Clipboard Copied! Toggle word wrap Toggle overflow 将 <namespace > 替换为运行 pod 的 OpenShift 命名空间。
使用
kafka-reassign-partitions.sh命令生成重新分配 JSON。将
my-topic分区移到指定代理的示例bin/kafka-reassign-partitions.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 \ --command-config /tmp/config.properties \ --topics-to-move-json-file /tmp/topics.json \ --broker-list 0,1,2,3,4 \ --generate
bin/kafka-reassign-partitions.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 \ --command-config /tmp/config.properties \ --topics-to-move-json-file /tmp/topics.json \ --broker-list 0,1,2,3,4 \ --generateCopy to Clipboard Copied! Toggle word wrap Toggle overflow