20.2. 生成重新分配 JSON 文件来重新分配分区
使用 kafka-reassign-partitions.sh 工具生成一个重新分配 JSON 文件,以便在扩展 Kafka 集群后重新分配分区。添加或删除代理不会自动重新分发现有分区。要平衡分区分布并充分利用新代理,您可以使用 kafka-reassign-partitions.sh 工具重新分配分区。
您可以从连接到 Kafka 集群的交互式 pod 容器运行该工具。
以下流程描述了使用 mTLS 的安全重新分配过程。您需要一个使用 TLS 加密和 mTLS 身份验证的 Kafka 集群。
您需要以下内容来建立连接:
- 创建 Kafka 集群时由 Cluster Operator 生成的集群 CA 证书和私钥
- 当用户为对 Kafka 集群进行客户端访问时,User Operator 生成的用户 CA 证书和私钥
在此过程中,CA 证书和对应的密码会从集群和包含它们的用户 secret 中提取,这些 secret 以 PKCS #12 (.p12 和 .password) 的格式提取。密码允许访问包含证书的 .p12 存储。您可以使用 .p12 存储指定一个信任存储和密钥存储来验证与 Kafka 集群的连接。
先决条件
- 您有一个正在运行的 Cluster Operator。
您有一个正在运行的 Kafka 集群,它基于配置了内部 TLS 加密和 mTLS 身份验证的
Kafka资源。使用 TLS 加密和 mTLS 验证的 Kafka 配置
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster spec: kafka: # ... listeners: # ... - name: tls port: 9093 type: internal tls: true1 authentication: type: tls2 # ...正在运行的 Kafka 集群包含一组要重新分配的主题和分区。
my-topic的主题配置示例apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: my-topic labels: strimzi.io/cluster: my-cluster spec: partitions: 10 replicas: 3 config: retention.ms: 7200000 segment.bytes: 1073741824 # ...有一个配置了 ACL 规则的
KafkaUser,它用于指定从 Kafka 代理生成和使用主题的权限。使用 ACL 规则的 Kafka 用户配置示例,允许对
my-topic和my-cluster执行操作apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaUser metadata: name: my-user labels: strimzi.io/cluster: my-cluster spec: authentication:1 type: tls authorization: type: simple2 acls: # access to the topic - resource: type: topic name: my-topic operations: - Create - Describe - Read - AlterConfigs host: "*" # access to the cluster - resource: type: cluster operations: - Alter - AlterConfigs host: "*" # ... # ...
流程
从 Kafka 集群的 <
cluster_name> -cluster-ca-certsecret 中提取集群 CA 证书和密钥。oc get secret <cluster_name>-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12oc get secret <cluster_name>-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d > ca.password将 <cluster_name > 替换为 Kafka 集群的名称。当使用 Kafka 资源部署 Kafka 时,使用
Kafka集群名称(<cluster_name> -cluster-ca-cert)创建带有集群CA 证书的 secret。例如,my-cluster-cluster-ca-cert。使用 Apache Kafka 镜像的 Streams 运行一个新的交互式 pod 容器,以连接到正在运行的 Kafka 代理。
oc run --restart=Never --image=registry.redhat.io/amq-streams/kafka-37-rhel9:2.7.0 <interactive_pod_name> -- /bin/sh -c "sleep 3600"将 <interactive_pod_name > 替换为 pod 的名称。
将集群 CA 证书复制到交互式 pod 容器。
oc cp ca.p12 <interactive_pod_name>:/tmp从有权访问 Kafka 代理的 Kafka 用户的 secret 中提取用户 CA 证书和密钥。
oc get secret <kafka_user> -o jsonpath='{.data.user\.p12}' | base64 -d > user.p12oc get secret <kafka_user> -o jsonpath='{.data.user\.password}' | base64 -d > user.password将 <kafka_user > 替换为 Kafka 用户的名称。当使用
KafkaUser资源创建 Kafka 用户时,会使用 Kafka 用户名创建带有用户 CA 证书的 secret。例如,my-user。将用户 CA 证书复制到交互式 pod 容器。
oc cp user.p12 <interactive_pod_name>:/tmpCA 证书允许交互式 pod 容器使用 TLS 连接到 Kafka 代理。
创建
config.properties文件,以指定用于验证与 Kafka 集群连接的信任存储和密钥存储。使用您在前面的步骤中提取的证书和密码。
bootstrap.servers=<kafka_cluster_name>-kafka-bootstrap:90931 security.protocol=SSL2 ssl.truststore.location=/tmp/ca.p123 ssl.truststore.password=<truststore_password>4 ssl.keystore.location=/tmp/user.p125 ssl.keystore.password=<keystore_password>6 将
config.properties文件复制到交互式 pod 容器中。oc cp config.properties <interactive_pod_name>:/tmp/config.properties准备名为
topics.json的 JSON 文件,以指定要移动的主题。将主题名称指定为用逗号分开的列表。
重新分配
my-topic的所有分区的 JSON 文件示例{ "version": 1, "topics": [ { "topic": "my-topic"} ] }您还可以使用此文件 更改主题 的复制因素。
将
topics.json文件复制到交互式 pod 容器中。oc cp topics.json <interactive_pod_name>:/tmp/topics.json在交互式 pod 容器中启动 shell 进程。
oc exec -n <namespace> -ti <interactive_pod_name> /bin/bash将 <namespace > 替换为运行 pod 的 OpenShift 命名空间。
使用
kafka-reassign-partitions.sh命令生成重新分配 JSON。将
my-topic分区移到指定代理的示例bin/kafka-reassign-partitions.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 \ --command-config /tmp/config.properties \ --topics-to-move-json-file /tmp/topics.json \ --broker-list 0,1,2,3,4 \ --generate