65.14. 批处理 Consumer
要使用 Camel 的 Kafka 批处理使用者,应用程序必须将配置 批处理 设置为 true。
接收的记录存储在管道中使用的交换的列表中。因此,可以通过提交列表中的最后一个交换,一次单独提交每个记录或整个批处理。
批处理的大小由选项 maxPollRecords 控制。
为了避免阻塞太长,请等待整个记录集填充批处理,可以使用 pollTimeoutMs 选项为轮询设置超时。在这种情况下,批处理可能包含比 maxPollRecords 中设置的消息小。
65.14.1. 自动提交 复制链接链接已复制到粘贴板!
默认情况下,Camel 在使用批处理时使用自动提交。在这种情况下,Camel 会在应用程序成功处理后自动提交记录。
如果出现故障,则不会处理记录。以下代码提供了此方法的示例:
public void configure() {
from("kafka:topic?groupId=myGroup&pollTimeoutMs=1000&batching=true&maxPollRecords=10&autoOffsetReset=earliest").process(e -> {
// The received records are stored as exchanges in a list. This gets the list of those exchanges
final List<?> exchanges = e.getMessage().getBody(List.class);
// Ensure we are actually receiving what we are asking for
if (exchanges == null || exchanges.isEmpty()) {
return;
}
// The records from the batch are stored in a list of exchanges in the original exchange. To process, we iterate over that list
for (Object obj : exchanges) {
if (obj instanceof Exchange exchange) {
LOG.info("Processing exchange with body {}", exchange.getMessage().getBody(String.class));
}
}
}).to(KafkaTestUtil.MOCK_RESULT);
}
65.14.1.1. 使用自动提交处理错误 复制链接链接已复制到粘贴板!
在使用自动提交时,如果处理失败时,Camel 不会提交记录。因此,有可能会多次重新处理记录的风险。
建议您实施适当的错误处理机制和模式(如死信队列),以防止失败的记录阻止处理进度。
以下代码提供了处理自动提交错误的示例:
public void configure() {
/*
We want to use continued here, so that Camel auto-commits the batch even though part of it has failed. In a
production scenario, applications should probably send these records to a separate topic or fix the condition
that lead to the failure
*/
onException(IllegalArgumentException.class).process(exchange -> {
LOG.warn("Failed to process batch {}", exchange.getMessage().getBody());
LOG.warn("Failed to process due to {}", exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class).getMessage());
}).continued(true);
from("kafka:topic?groupId=myGroup&pollTimeoutMs=1000&batching=true&maxPollRecords=10&autoOffsetReset=earliest").process(e -> {
// The received records are stored as exchanges in a list. This gets the list of those exchanges
final List<?> exchanges = e.getMessage().getBody(List.class);
// Ensure we are actually receiving what we are asking for
if (exchanges == null || exchanges.isEmpty()) {
return;
}
// The records from the batch are stored in a list of exchanges in the original exchange.
int i = 0;
for (Object o : exchanges) {
if (o instanceof Exchange exchange) {
i++;
LOG.info("Processing exchange with body {}", exchange.getMessage().getBody(String.class));
if (i == 4) {
throw new IllegalArgumentException("Failed to process record");
}
}
}
}).to(KafkaTestUtil.MOCK_RESULT);
}
65.14.2. 手动提交 复制链接链接已复制到粘贴板!
使用手动提交的批处理时,它最多由应用提交记录,并处理潜在无效记录的结果。
以下代码提供了此方法的示例:
public void configure() {
from("kafka:topic?batching=true&allowManualCommit=true&maxPollRecords=100&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory")
.process(e -> {
// The received records are stored as exchanges in a list. This gets the list of those exchanges
final List<?> exchanges = e.getMessage().getBody(List.class);
// Ensure we are actually receiving what we are asking for
if (exchanges == null || exchanges.isEmpty()) {
return;
}
/*
Every exchange in that list should contain a reference to the manual commit object. We use the reference
for the last exchange in the list to commit the whole batch
*/
final Object tmp = exchanges.getLast();
if (tmp instanceof Exchange exchange) {
KafkaManualCommit manual =
exchange.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
LOG.debug("Performing manual commit");
manual.commit();
LOG.debug("Done performing manual commit");
}
});
}
65.14.3. 处理较长的轮询超时 复制链接链接已复制到粘贴板!
在某些情况下,应用程序可能希望轮询过程需要较长的超时(请参阅: pollTimeoutMs)。
要正确这样做,首先确保具有大于轮询超时的最大轮询间隔(请参阅: maxPollIntervalMs)。
然后,增加关闭超时,以确保提交、关闭和其他 Kafka 操作不会立即中止。例如:
public void configure() {
// Note that this can be configured in other ways
getCamelContext().getShutdownStrategy().setTimeout(10000);
// route setup ...
}