26.7. 샘플
26.7.1. Kafka에서 메시지 사용
Kafka에서 메시지를 읽는 데 필요한 최소 경로는 다음과 같습니다.
from("kafka:test?brokers=localhost:9092") .log("Message received from Kafka : ${body}") .log(" on the topic ${headers[kafka.TOPIC]}") .log(" on the partition ${headers[kafka.PARTITION]}") .log(" with the offset ${headers[kafka.OFFSET]}") .log(" with the key ${headers[kafka.KEY]}")
여러 주제에서 메시지를 사용해야 하는 경우 쉼표로 구분된 주제 이름 목록을 사용할 수 있습니다.
from("kafka:test,test1,test2?brokers=localhost:9092") .log("Message received from Kafka : ${body}") .log(" on the topic ${headers[kafka.TOPIC]}") .log(" on the partition ${headers[kafka.PARTITION]}") .log(" with the offset ${headers[kafka.OFFSET]}") .log(" with the key ${headers[kafka.KEY]}")
또한 주제 이름으로 패턴을 제공하고 topicIsPattern
옵션을 사용하여 여러 항목에 등록할 수도 있습니다.
from("kafka:test*?brokers=localhost:9092&topicIsPattern=true") .log("Message received from Kafka : ${body}") .log(" on the topic ${headers[kafka.TOPIC]}") .log(" on the partition ${headers[kafka.PARTITION]}") .log(" with the offset ${headers[kafka.OFFSET]}") .log(" with the key ${headers[kafka.KEY]}")
Kafka에서 메시지를 사용할 때 자체 오프셋 관리를 사용할 수 있으며 이 관리를 Kafka에 위임하지 않습니다. 오프셋을 유지하려면 구성 요소에는 File
StateRepository
과 같은 StateRepository 구현이 필요합니다. 이 빈은 레지스트리에서 사용할 수 있어야 합니다. 여기에 그것을 사용하는 방법:
// Create the repository in which the Kafka offsets will be persisted FileStateRepository repository = FileStateRepository.fileStateRepository(new File("/path/to/repo.dat")); // Bind this repository into the Camel registry Registry registry = createCamelRegistry(); registry.bind("offsetRepo", repository); // Configure the camel context DefaultCamelContext camelContext = new DefaultCamelContext(registry); camelContext.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("kafka:" + TOPIC + "?brokers=localhost:{{kafkaPort}}" + // Setup the topic and broker address "&groupId=A" + // The consumer processor group ID "&autoOffsetReset=earliest" + // Ask to start from the beginning if we have unknown offset "&offsetRepository=#offsetRepo") // Keep the offsets in the previously configured repository .to("mock:result"); } });