19.2. 生成重新分配 JSON 文件来重新分配分区
使用 kafka-reassign-partitions.sh
工具生成重新分配 JSON 文件,以便在扩展 Kafka 集群后重新分配分区。添加或删除代理不会自动重新分发现有分区。要平衡分区分发并充分利用新代理,您可以使用 kafka-reassign-partitions.sh
工具重新分配分区。
您可以从连接到 Kafka 集群的交互式 pod 容器运行该工具。
以下流程描述了使用 mTLS 的安全重新分配过程。您需要一个使用 TLS 加密和 mTLS 身份验证的 Kafka 集群。
您需要以下内容来建立连接:
- 当 Kafka 集群时,Cluster Operator 生成的集群 CA 证书和密钥
- 当用户为客户端访问 Kafka 集群的用户创建时,User Operator 生成的用户 CA 证书和密钥
在此过程中,CA 证书和对应的密码会从集群和包含它们的用户 secret 中提取,这些 secret 以 PKCS #12 (.p12
和 .password
) 的格式提取。密码允许访问包含证书的 .p12
存储。您可以使用 .p12
存储来指定信任存储和密钥存储来验证与 Kafka 集群的连接。
先决条件
- 您有一个正在运行的 Cluster Operator。
您有一个基于配置了内部 TLS 加密和 mTLS 身份验证的
Kafka
资源运行 Kafka 集群。使用 TLS 加密和 mTLS 身份验证配置 Kafka
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 正在运行的 Kafka 集群包含一组要重新分配的主题和分区。
my-topic
的主题配置示例Copy to Clipboard Copied! Toggle word wrap Toggle overflow 有一个配置了 ACL 规则的
KafkaUser
,它用于指定从 Kafka 代理生成和使用主题的权限。带有 ACL 规则的 Kafka 用户配置示例,允许对
my-topic
和my-cluster
执行操作Copy to Clipboard Copied! Toggle word wrap Toggle overflow
流程
从 Kafka 集群的 <
cluster_name> -cluster-ca-cert
secret 中提取集群 CA 证书和密钥。oc get secret <cluster_name>-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12
oc get secret <cluster_name>-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12
Copy to Clipboard Copied! Toggle word wrap Toggle overflow oc get secret <cluster_name>-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d > ca.password
oc get secret <cluster_name>-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d > ca.password
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 将 <cluster_name > 替换为 Kafka 集群的名称。当使用 Kafka 资源部署 Kafka 时,会使用
Kafka
集群名称(<cluster_name> -cluster-ca-cert)创建带有集群
CA 证书的 secret。例如,my-cluster-cluster-ca-cert
。使用 AMQ Streams Kafka 镜像运行新的交互式 pod 容器,以连接到正在运行的 Kafka 代理。
oc run --restart=Never --image=registry.redhat.io/amq-streams/kafka-35-rhel8:2.5.1 <interactive_pod_name> -- /bin/sh -c "sleep 3600"
oc run --restart=Never --image=registry.redhat.io/amq-streams/kafka-35-rhel8:2.5.1 <interactive_pod_name> -- /bin/sh -c "sleep 3600"
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 将 <interactive_pod_name > 替换为 pod 的名称。
将集群 CA 证书复制到交互式 pod 容器。
oc cp ca.p12 <interactive_pod_name>:/tmp
oc cp ca.p12 <interactive_pod_name>:/tmp
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 从具有访问 Kafka 代理权限的 Kafka 用户的 secret 中提取用户 CA 用户和密码。
oc get secret <kafka_user> -o jsonpath='{.data.user\.p12}' | base64 -d > user.p12
oc get secret <kafka_user> -o jsonpath='{.data.user\.p12}' | base64 -d > user.p12
Copy to Clipboard Copied! Toggle word wrap Toggle overflow oc get secret <kafka_user> -o jsonpath='{.data.user\.password}' | base64 -d > user.password
oc get secret <kafka_user> -o jsonpath='{.data.user\.password}' | base64 -d > user.password
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 将 <kafka_user > 替换为 Kafka 用户的名称。当使用
KafkaUser
资源创建 Kafka 用户时,会使用 Kafka 用户名创建带有用户 CA 证书的 secret。例如,my-user
。将用户 CA 证书复制到交互式 pod 容器。
oc cp user.p12 <interactive_pod_name>:/tmp
oc cp user.p12 <interactive_pod_name>:/tmp
Copy to Clipboard Copied! Toggle word wrap Toggle overflow CA 证书允许交互式 pod 容器使用 TLS 连接到 Kafka 代理。
创建
config.properties
文件,以指定用于验证与 Kafka 集群的连接的信任存储和密钥存储。使用您在上一步中提取的证书和密码。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 将
config.properties
文件复制到交互式 pod 容器。oc cp config.properties <interactive_pod_name>:/tmp/config.properties
oc cp config.properties <interactive_pod_name>:/tmp/config.properties
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 准备名为
topics.json
的 JSON 文件,以指定要移动的主题。将主题名称指定为用逗号分开的列表。
重新分配
my-topic
的所有分区的 JSON 文件示例Copy to Clipboard Copied! Toggle word wrap Toggle overflow 您还可以使用此文件 更改主题的复制因素。
将
topics.json
文件复制到交互式 pod 容器。oc cp topics.json <interactive_pod_name>:/tmp/topics.json
oc cp topics.json <interactive_pod_name>:/tmp/topics.json
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 在交互式 pod 容器中启动 shell 进程。
oc exec -n <namespace> -ti <interactive_pod_name> /bin/bash
oc exec -n <namespace> -ti <interactive_pod_name> /bin/bash
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 将 <namespace > 替换为运行 pod 的 OpenShift 命名空间。
使用
kafka-reassign-partitions.sh
命令生成重新分配 JSON。将
my-topic
分区移到指定的代理的命令示例bin/kafka-reassign-partitions.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 \ --command-config /tmp/config.properties \ --topics-to-move-json-file /tmp/topics.json \ --broker-list 0,1,2,3,4 \ --generate
bin/kafka-reassign-partitions.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 \ --command-config /tmp/config.properties \ --topics-to-move-json-file /tmp/topics.json \ --broker-list 0,1,2,3,4 \ --generate
Copy to Clipboard Copied! Toggle word wrap Toggle overflow