65.7. 使用方法
65.7.1. 消费者错误处理 复制链接链接已复制到粘贴板!
虽然 kafka 使用者是从 kafka 代理轮询消息,但可能会出现错误。本节描述了发生什么情况,以及您可以配置的内容。
在调用 Kafka 轮询 API 时,使用者可能会抛出异常。例如,如果因为无效数据导致消息无法序列化,并且许多其他类型的错误。这些错误采用 KafkaException 的形式,可以是 retry 或 not。可以重试的例外情况(RetriableException)将再次重试(在两者之间进行轮询超时)。所有其他异常会根据 pollOnError 配置进行处理。此配置具有以下值:
- DISCARD 将丢弃消息,并继续轮询下一个消息。
- ERROR_HANDLER 将使用 Camel 的错误处理程序来处理异常,之后继续轮询下一个消息。
- RECONNECT 将重新连接消费者,并尝试再次轮询消息。
- RETRY 将使消费者重试轮询相同的消息
- STOP 将停止消费者(如果消费者应能够再次使用消息,则必须手动启动/重新启动)。
默认为 ERROR_HANDLER,它将让 Camel 的错误处理程序(如果有任何配置)处理导致异常。然后继续轮询下一个消息。此行为与 Camel 组件具有的 bridgeErrorHandler 选项类似。
对于高级控制,可以在组件级别上配置 org.apache.camel.component.kafka.PollExceptionStrategy 的自定义实现,允许控制每个异常要使用的策略。
65.7.2. 消费者错误处理(advanced) 复制链接链接已复制到粘贴板!
默认情况下,Camel 将使用 ERROR_HANDLER 轮询来处理异常。Camel 如何使用配置中的 breakOnFirstError 属性处理导致异常的消息。Camel 将不继续轮询下一个消息,而是提交偏移,以便重试导致异常的消息。这与上面的 RETRY 轮询策略类似。
KafkaComponent kafka = new KafkaComponent();
kafka.setBreakOnFirstError(true);
...
camelContext.addComponent("kafka", kafka);
建议您阅读下面的"使用 Kafka 使用者手动提交"的部分,以了解 breakOnFirstError 如何根据配置的 CommitManager 工作。