6.3. 将合作重新平衡与消费者一起使用
Kafka 用户使用由重新平衡协议决定的分区分配策略。默认情况下,Kafka 使用 RangeAssignor
协议,它涉及用户在重新平衡过程中重新精简其分区分配,从而导致潜在的服务中断。
为提高效率并缩短停机时间,您可以切换到 CooperativeStickyAssignor
协议,这是一个合作重新平衡方法。与默认协议不同,合作重新平衡使消费者能够一起工作,在重新平衡期间保留其分区分配,只有在需要在消费者组中实现平衡时才释放分区。
流程
在消费者配置中,使用
partition.assignment.strategy
属性切换到使用CooperativeStickyAssignor
作为协议。例如,如果当前配置是partition.assignment.strategy=RangeAssignor, CooperativeStickyAssignor
,请将其更新为partition.assignment.strategy=CooperativeStickyAssignor
。您还可以使用消费者应用程序代码中的
props.put
设置分区分配策略,而不是直接修改消费者配置文件:... ...
# ... props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.CooperativeStickyAssignor"); # ...
Copy to Clipboard Copied! - 一次重启组中的每个消费者,允许它们在每次重启后重新加入组。
切换到 CooperativeStickyAssignor
协议后,可能会在消费者重新平衡过程中发生 RebalanceInProgressException
,从而导致同一消费者组中的多个 Kafka 客户端的意外停止页面。另外,这个问题可能会导致意外的消息重复,即使 Kafka 用户在重新平衡过程中没有更改其分区分配。如果您使用自动偏移提交(enable.auto.commit=true
),则不需要进行任何更改。如果您要手动提交偏移(enable.auto.commit=false
),并在手动提交过程中发生 RebalanceInProgressException
,请将消费者实施更改为下一循环中调用 poll ()
以完成消费者重新平衡过程。如需更多信息,请参阅 客户门户网站中的合作StickyAssignor
文章。