66.15. 自定义订阅适配器
具有复杂订阅逻辑的应用程序可能会提供一个自定义 Bean 来处理订阅过程。为此,需要实施接口 SubscribeAdapter
。
订阅一组 Kafka 主题或模式的订阅者适配器示例
public class CustomSubscribeAdapter implements SubscribeAdapter { @Override public void subscribe(Consumer<?, ?> consumer, ConsumerRebalanceListener reBalanceListener, TopicInfo topicInfo) { if (topicInfo.isPattern()) { consumer.subscribe(topicInfo.getPattern(), reBalanceListener); } else { consumer.subscribe(topicInfo.getTopics(), reBalanceListener); } } }
public class CustomSubscribeAdapter implements SubscribeAdapter {
@Override
public void subscribe(Consumer<?, ?> consumer, ConsumerRebalanceListener reBalanceListener, TopicInfo topicInfo) {
if (topicInfo.isPattern()) {
consumer.subscribe(topicInfo.getPattern(), reBalanceListener);
} else {
consumer.subscribe(topicInfo.getTopics(), reBalanceListener);
}
}
}
然后,需要将名为 bean 实例添加到 registry 中:
添加到 registry 示例
context.getRegistry().bind(KafkaConstants.KAFKA_SUBSCRIBE_ADAPTER, new CustomSubscribeAdapter());
context.getRegistry().bind(KafkaConstants.KAFKA_SUBSCRIBE_ADAPTER, new CustomSubscribeAdapter());