34.7. Samples
34.7.1. 使用 Kafka 的消息 复制链接链接已复制到粘贴板!
复制链接链接已复制到粘贴板!
以下是从 Kafka 读取信息所需的最小路由。
from("kafka:test?brokers=localhost:9092")
.log("Message received from Kafka : ${body}")
.log(" on the topic ${headers[kafka.TOPIC]}")
.log(" on the partition ${headers[kafka.PARTITION]}")
.log(" with the offset ${headers[kafka.OFFSET]}")
.log(" with the key ${headers[kafka.KEY]}")
如果您需要使用来自多个主题的消息,您可以使用逗号分隔的主题名称列表。
from("kafka:test,test1,test2?brokers=localhost:9092")
.log("Message received from Kafka : ${body}")
.log(" on the topic ${headers[kafka.TOPIC]}")
.log(" on the partition ${headers[kafka.PARTITION]}")
.log(" with the offset ${headers[kafka.OFFSET]}")
.log(" with the key ${headers[kafka.KEY]}")
也可以订阅多个主题,为主题名称提供模式,并使用 topicIsPattern 选项。
from("kafka:test*?brokers=localhost:9092&topicIsPattern=true")
.log("Message received from Kafka : ${body}")
.log(" on the topic ${headers[kafka.TOPIC]}")
.log(" on the partition ${headers[kafka.PARTITION]}")
.log(" with the offset ${headers[kafka.OFFSET]}")
.log(" with the key ${headers[kafka.KEY]}")
使用 Kafka 中的消息时,您可以使用自己的偏移管理,且不会将这个管理委派给 Kafka。为了保持偏移,组件需要 StateRepository 实施,如 FileStateRepository。此 bean 应该在 registry 中提供。在这里如何使用它:
// Create the repository in which the Kafka offsets will be persisted
FileStateRepository repository = FileStateRepository.fileStateRepository(new File("/path/to/repo.dat"));
// Bind this repository into the Camel registry
Registry registry = createCamelRegistry();
registry.bind("offsetRepo", repository);
// Configure the camel context
DefaultCamelContext camelContext = new DefaultCamelContext(registry);
camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("kafka:" + TOPIC + "?brokers=localhost:{{kafkaPort}}" +
// Setup the topic and broker address
"&groupId=A" +
// The consumer processor group ID
"&autoOffsetReset=earliest" +
// Ask to start from the beginning if we have unknown offset
"&offsetRepository=#offsetRepo")
// Keep the offsets in the previously configured repository
.to("mock:result");
}
});
34.7.2. 生成信息到 Kafka 复制链接链接已复制到粘贴板!
复制链接链接已复制到粘贴板!
以下是向 Kafka 写入信息所需的最小路由。
from("direct:start")
.setBody(constant("Message from Camel")) // Message to send
.setHeader(KafkaConstants.KEY, constant("Camel")) // Key of the message
.to("kafka:test?brokers=localhost:9092");