181.4. サンプル
181.4.1. Kafka からのメッセージの消費
以下は、Kafka からメッセージを読み取るために必要な最小限のルートです。
from("kafka:test?brokers=localhost:9092") .log("Message received from Kafka : ${body}") .log(" on the topic ${headers[kafka.TOPIC]}") .log(" on the partition ${headers[kafka.PARTITION]}") .log(" with the offset ${headers[kafka.OFFSET]}") .log(" with the key ${headers[kafka.KEY]}")
Kafka からメッセージを消費する場合は、独自のオフセット管理を使用し、この管理を Kafka に委譲しないでください。オフセットを保持するには、コンポーネントに File
StateRepository
などの StateRepository 実装が必要です。この Bean はレジストリーで使用できる必要があります。ここでは、この使用方法を紹介します。
// Create the repository in which the Kafka offsets will be persisted FileStateRepository repository = FileStateRepository.fileStateRepository(new File("/path/to/repo.dat")); // Bind this repository into the Camel registry JndiRegistry registry = new JndiRegistry(); registry.bind("offsetRepo", repository); // Configure the camel context DefaultCamelContext camelContext = new DefaultCamelContext(registry); camelContext.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("kafka:" + TOPIC + "?brokers=localhost:{{kafkaPort}}" + "&groupId=A" + // "&autoOffsetReset=earliest" + // Ask to start from the beginning if we have unknown offset "&offsetRepository=#offsetRepo") // Keep the offsets in the previously configured repository .to("mock:result"); } });