191.5. Samples
191.5.1. 消耗 Kafka 的消息
以下是从 Kafka 读取信息所需的最小路由。
from("kafka:test?brokers=localhost:9092") .log("Message received from Kafka : ${body}") .log(" on the topic ${headers[kafka.TOPIC]}") .log(" on the partition ${headers[kafka.PARTITION]}") .log(" with the offset ${headers[kafka.OFFSET]}") .log(" with the key ${headers[kafka.KEY]}")
如果您需要消耗来自多个主题的消息,您可以使用以逗号分隔的主题名称列表
from("kafka:test,test1,test2?brokers=localhost:9092") .log("Message received from Kafka : ${body}") .log(" on the topic ${headers[kafka.TOPIC]}") .log(" on the partition ${headers[kafka.PARTITION]}") .log(" with the offset ${headers[kafka.OFFSET]}") .log(" with the key ${headers[kafka.KEY]}")
当消耗来自 Kafka 的消息时,您可以使用自己的偏移管理,而不将这个管理委派给 Kafka。为了保持偏移量,组件需要 StateRepository
实施,如 FileStateRepository
。此 bean 应在 registry 中提供。在这里如何使用它:
// Create the repository in which the Kafka offsets will be persisted FileStateRepository repository = FileStateRepository.fileStateRepository(new File("/path/to/repo.dat")); // Bind this repository into the Camel registry JndiRegistry registry = new JndiRegistry(); registry.bind("offsetRepo", repository); // Configure the camel context DefaultCamelContext camelContext = new DefaultCamelContext(registry); camelContext.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("kafka:" + TOPIC + "?brokers=localhost:{{kafkaPort}}" + // Setup the topic and broker address "&groupId=A" + // The consumer processor group ID "&autoOffsetReset=earliest" + // Ask to start from the beginning if we have unknown offset "&offsetRepository=#offsetRepo") // Keep the offsets in the previously configured repository .to("mock:result"); } });
191.5.2. 将消息生成到 Kafka
以下是将信息写入 Kafka 所需的最小路由。
from("direct:start") .setBody(constant("Message from Camel")) // Message to send .setHeader(KafkaConstants.KEY, constant("Camel")) // Key of the message .to("kafka:test?brokers=localhost:9092");