15.3. 在删除代理前重新分配分区
在减少 Kafka 集群中的代理数量前,使用 kafka-reassign-partitions.sh 工具生成的重新分配文件来重新分配分区。重新分配文件必须描述如何将分区重新分配给 Kafka 集群中的剩余代理。您可以将文件中指定的重新分配应用到代理,然后验证新分区分配。首先删除编号最高的 pod 中的代理。
此流程描述了使用 TLS 的安全扩展过程。您需要一个使用 TLS 加密和 mTLS 身份验证的 Kafka 集群。
虽然您可以使用 kafka-reassign-partitions.sh 工具,但建议使用 Cruise Control 进行自动分区重新分配和集群重新平衡。Cruise Control 可以在不停机的情况下将主题从一个代理移到另一个代理,这是重新分配分区的最高效方法。
先决条件
- 现有的 Kafka 集群。
您已创建了 JSON 文件,以指定如何将分区重新分配给减少集群中的代理。
在此过程中,我们为名为
my-topic的主题重新分配所有分区。名为topics.json的 JSON 文件指定主题,用于生成reassignment.json文件。
示例 JSON 文件指定 my-topic
流程
如果没有这样做,请使用
kafka-reassign-partitions.sh工具生成名为reassignment.json的重新分配 JSON 文件。生成重新分配 JSON 文件的命令示例
/opt/kafka/bin/kafka-reassign-partitions.sh \ --bootstrap-server localhost:9092 \ --topics-to-move-json-file topics.json \ --broker-list 0,1,2,3 \ --generate
/opt/kafka/bin/kafka-reassign-partitions.sh \ --bootstrap-server localhost:9092 \ --topics-to-move-json-file topics.json \1 --broker-list 0,1,2,3 \2 --generateCopy to Clipboard Copied! Toggle word wrap Toggle overflow 显示当前和提议的副本分配的 JSON 文件示例
Current partition replica assignment {"version":1,"partitions":[{"topic":"my-topic","partition":0,"replicas":[3,4,2,0],"log_dirs":["any","any","any","any"]},{"topic":"my-topic","partition":1,"replicas":[0,2,3,1],"log_dirs":["any","any","any","any"]},{"topic":"my-topic","partition":2,"replicas":[1,3,0,4],"log_dirs":["any","any","any","any"]}]} Proposed partition reassignment configuration {"version":1,"partitions":[{"topic":"my-topic","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any"]},{"topic":"my-topic","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"my-topic","partition":2,"replicas":[2,3,0],"log_dirs":["any","any","any"]}]}Current partition replica assignment {"version":1,"partitions":[{"topic":"my-topic","partition":0,"replicas":[3,4,2,0],"log_dirs":["any","any","any","any"]},{"topic":"my-topic","partition":1,"replicas":[0,2,3,1],"log_dirs":["any","any","any","any"]},{"topic":"my-topic","partition":2,"replicas":[1,3,0,4],"log_dirs":["any","any","any","any"]}]} Proposed partition reassignment configuration {"version":1,"partitions":[{"topic":"my-topic","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any"]},{"topic":"my-topic","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"my-topic","partition":2,"replicas":[2,3,0],"log_dirs":["any","any","any"]}]}Copy to Clipboard Copied! Toggle word wrap Toggle overflow 如果您需要稍后恢复更改,请将此文件的副本保存到本地。
使用
--execute选项运行分区重新分配。/opt/kafka/bin/kafka-reassign-partitions.sh \ --bootstrap-server localhost:9092 \ --reassignment-json-file reassignment.json \ --execute
/opt/kafka/bin/kafka-reassign-partitions.sh \ --bootstrap-server localhost:9092 \ --reassignment-json-file reassignment.json \ --executeCopy to Clipboard Copied! Toggle word wrap Toggle overflow 如果要节流复制,您还可以通过
--throttle选项,并传递一个 inter-broker 节流率(以字节/秒为单位)。例如:/opt/kafka/bin/kafka-reassign-partitions.sh \ --bootstrap-server localhost:9092 \ --reassignment-json-file reassignment.json \ --throttle 5000000 \ --execute
/opt/kafka/bin/kafka-reassign-partitions.sh \ --bootstrap-server localhost:9092 \ --reassignment-json-file reassignment.json \ --throttle 5000000 \ --executeCopy to Clipboard Copied! Toggle word wrap Toggle overflow 使用
--verify选项验证重新分配是否已完成。/opt/kafka/bin/kafka-reassign-partitions.sh \ --bootstrap-server localhost:9092 \ --reassignment-json-file reassignment.json \ --verify
/opt/kafka/bin/kafka-reassign-partitions.sh \ --bootstrap-server localhost:9092 \ --reassignment-json-file reassignment.json \ --verifyCopy to Clipboard Copied! Toggle word wrap Toggle overflow 当
--verify命令报告正在移动的每个分区都成功完成时,重新分配已完成。最后--verify也具有删除任何重新分配节流的影响。检查正在删除的每个代理是否没有任何实时分区(
log.dirs)。ls -l <LogDir> | grep -E '^d' | grep -vE '[a-zA-Z0-9.-]+\.[a-z0-9]+-delete$'
ls -l <LogDir> | grep -E '^d' | grep -vE '[a-zA-Z0-9.-]+\.[a-z0-9]+-delete$'Copy to Clipboard Copied! Toggle word wrap Toggle overflow 如果日志目录与正则表达式
\.[a-z0-9]-delete$不匹配,则活动分区仍然存在。如果您有活跃的分区,检查重新分配已完成,或者在重新分配 JSON 文件中的配置。您可以再次运行重新分配。在继续下一步之前,请确保没有活动的分区。停止代理。
su - kafka /opt/kafka/bin/kafka-server-stop.sh
su - kafka /opt/kafka/bin/kafka-server-stop.shCopy to Clipboard Copied! Toggle word wrap Toggle overflow 确认 Kafka 代理已停止。
jcmd | grep kafka
jcmd | grep kafkaCopy to Clipboard Copied! Toggle word wrap Toggle overflow