12.2. 生成重新分配 JSON 文件来重新分配分区
使用 kafka-reassign-partitions.sh
工具生成重新分配 JSON 文件,以便在扩展 Kafka 集群后重新分配分区。添加或删除代理不会自动重新分发现有分区。要平衡分区分发并充分利用新代理,您可以使用 kafka-reassign-partitions.sh
工具重新分配分区。
您可以从连接到 Kafka 集群的交互式 pod 容器运行该工具。
以下流程描述了使用 mTLS 的安全重新分配过程。您需要一个使用 TLS 加密和 mTLS 身份验证的 Kafka 集群。
您需要以下内容来建立连接:
- 当 Kafka 集群时,Cluster Operator 生成的集群 CA 证书和密钥
- 当用户为客户端访问 Kafka 集群的用户创建时,User Operator 生成的用户 CA 证书和密钥
在此过程中,CA 证书和对应的密码会从集群和包含它们的用户 secret 中提取,这些 secret 以 PKCS #12 (.p12
和 .password
) 的格式提取。密码允许访问包含证书的 .p12
存储。您可以使用 .p12
存储来指定信任存储和密钥存储来验证与 Kafka 集群的连接。
先决条件
- 您有一个正在运行的 Cluster Operator。
您有一个基于配置了内部 TLS 加密和 mTLS 身份验证的
Kafka
资源运行 Kafka 集群。使用 TLS 加密和 mTLS 身份验证配置 Kafka
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster spec: kafka: # ... listeners: # ... - name: tls port: 9093 type: internal tls: true 1 authentication: type: tls 2 # ...
正在运行的 Kafka 集群包含一组要重新分配的主题和分区。
my-topic
的主题配置示例apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: my-topic labels: strimzi.io/cluster: my-cluster spec: partitions: 10 replicas: 3 config: retention.ms: 7200000 segment.bytes: 1073741824 # ...
有一个配置了 ACL 规则的
KafkaUser
,它用于指定从 Kafka 代理生成和使用主题的权限。带有 ACL 规则的 Kafka 用户配置示例,允许对
my-topic
和my-cluster
执行操作apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaUser metadata: name: my-user labels: strimzi.io/cluster: my-cluster spec: authentication: 1 type: tls authorization: type: simple 2 acls: # access to the topic - resource: type: topic name: my-topic operations: - Create - Describe - Read - AlterConfigs host: "*" # access to the cluster - resource: type: cluster operations: - Alter - AlterConfigs host: "*" # ... # ...
流程
从 Kafka 集群的 <
cluster_name> -cluster-ca-cert
secret 中提取集群 CA 证书和密钥。oc get secret <cluster_name>-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12
oc get secret <cluster_name>-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d > ca.password
将 <cluster_name > 替换为 Kafka 集群的名称。当使用 Kafka 资源部署 Kafka 时,会使用
Kafka
集群名称(<cluster_name> -cluster-ca-cert)创建带有集群
CA 证书的 secret。例如,my-cluster-cluster-ca-cert
。使用 AMQ Streams Kafka 镜像运行新的交互式 pod 容器,以连接到正在运行的 Kafka 代理。
oc run --restart=Never --image=registry.redhat.io/amq-streams/kafka-34-rhel8:2.4.0 <interactive_pod_name> -- /bin/sh -c "sleep 3600"
将 <interactive_pod_name > 替换为 pod 的名称。
将集群 CA 证书复制到交互式 pod 容器。
oc cp ca.p12 <interactive_pod_name>:/tmp
从具有访问 Kafka 代理权限的 Kafka 用户的 secret 中提取用户 CA 用户和密码。
oc get secret <kafka_user> -o jsonpath='{.data.user\.p12}' | base64 -d > user.p12
oc get secret <kafka_user> -o jsonpath='{.data.user\.password}' | base64 -d > user.password
将 <kafka_user > 替换为 Kafka 用户的名称。当使用
KafkaUser
资源创建 Kafka 用户时,会使用 Kafka 用户名创建带有用户 CA 证书的 secret。例如,my-user
。将用户 CA 证书复制到交互式 pod 容器。
oc cp user.p12 <interactive_pod_name>:/tmp
CA 证书允许交互式 pod 容器使用 TLS 连接到 Kafka 代理。
创建
config.properties
文件,以指定用于验证与 Kafka 集群的连接的信任存储和密钥存储。使用您在上一步中提取的证书和密码。
bootstrap.servers=<kafka_cluster_name>-kafka-bootstrap:9093 1 security.protocol=SSL 2 ssl.truststore.location=/tmp/ca.p12 3 ssl.truststore.password=<truststore_password> 4 ssl.keystore.location=/tmp/user.p12 5 ssl.keystore.password=<keystore_password> 6
将
config.properties
文件复制到交互式 pod 容器。oc cp config.properties <interactive_pod_name>:/tmp/config.properties
准备名为
topics.json
的 JSON 文件,以指定要移动的主题。将主题名称指定为用逗号分开的列表。
重新分配
my-topic
的所有分区的 JSON 文件示例{ "version": 1, "topics": [ { "topic": "my-topic"} ] }
您还可以使用此文件 更改主题的复制因素。
将
topics.json
文件复制到交互式 pod 容器。oc cp topics.json <interactive_pod_name>:/tmp/topics.json
在交互式 pod 容器中启动 shell 进程。
oc exec -n <namespace> -ti <interactive_pod_name> /bin/bash
将 <namespace > 替换为运行 pod 的 OpenShift 命名空间。
使用
kafka-reassign-partitions.sh
命令生成重新分配 JSON。将
my-topic
分区移到指定的代理的命令示例bin/kafka-reassign-partitions.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 \ --command-config /tmp/config.properties \ --topics-to-move-json-file /tmp/topics.json \ --broker-list 0,1,2,3,4 \ --generate