191.5. Samples
191.5.1. Kafka에서 메시지 사용
다음은 Kafka에서 메시지를 읽는 데 필요한 최소 경로입니다.
from("kafka:test?brokers=localhost:9092") .log("Message received from Kafka : ${body}") .log(" on the topic ${headers[kafka.TOPIC]}") .log(" on the partition ${headers[kafka.PARTITION]}") .log(" with the offset ${headers[kafka.OFFSET]}") .log(" with the key ${headers[kafka.KEY]}")
여러 주제의 메시지를 사용해야 하는 경우 쉼표로 구분된 주제 이름 목록을 사용할 수 있습니다.
from("kafka:test,test1,test2?brokers=localhost:9092") .log("Message received from Kafka : ${body}") .log(" on the topic ${headers[kafka.TOPIC]}") .log(" on the partition ${headers[kafka.PARTITION]}") .log(" with the offset ${headers[kafka.OFFSET]}") .log(" with the key ${headers[kafka.KEY]}")
Kafka의 메시지를 사용하는 경우 자체 오프셋 관리를 사용하고 이 관리를 Kafka에 위임할 수 없습니다. 오프셋을 유지하려면 구성 요소에 File
와 같은 StateRepository 구현이 필요합니다. 이 8080은 레지스트리에서 사용할 수 있어야 합니다. 여기에 사용 방법:
StateRepository
// Create the repository in which the Kafka offsets will be persisted FileStateRepository repository = FileStateRepository.fileStateRepository(new File("/path/to/repo.dat")); // Bind this repository into the Camel registry JndiRegistry registry = new JndiRegistry(); registry.bind("offsetRepo", repository); // Configure the camel context DefaultCamelContext camelContext = new DefaultCamelContext(registry); camelContext.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("kafka:" + TOPIC + "?brokers=localhost:{{kafkaPort}}" + // Setup the topic and broker address "&groupId=A" + // The consumer processor group ID "&autoOffsetReset=earliest" + // Ask to start from the beginning if we have unknown offset "&offsetRepository=#offsetRepo") // Keep the offsets in the previously configured repository .to("mock:result"); } });
191.5.2. Kafka로 메시지 생성
다음은 Kafka에 메시지를 작성하는 데 필요한 최소 경로입니다.
from("direct:start") .setBody(constant("Message from Camel")) // Message to send .setHeader(KafkaConstants.KEY, constant("Camel")) // Key of the message .to("kafka:test?brokers=localhost:9092");