29.7. Samples


29.7.1. 消耗 Kafka 中的信息

以下是从 Kafka 读取信息所需的最小路由。

from("kafka:test?brokers=localhost:9092")
    .log("Message received from Kafka : ${body}")
    .log("    on the topic ${headers[kafka.TOPIC]}")
    .log("    on the partition ${headers[kafka.PARTITION]}")
    .log("    with the offset ${headers[kafka.OFFSET]}")
    .log("    with the key ${headers[kafka.KEY]}")

如果您需要使用来自多个主题的消息,您可以使用用逗号分开的主题名称列表。

from("kafka:test,test1,test2?brokers=localhost:9092")
    .log("Message received from Kafka : ${body}")
    .log("    on the topic ${headers[kafka.TOPIC]}")
    .log("    on the partition ${headers[kafka.PARTITION]}")
    .log("    with the offset ${headers[kafka.OFFSET]}")
    .log("    with the key ${headers[kafka.KEY]}")

您还可以订阅多个主题,为主题名称提供模式,并使用 topicIsPattern 选项。

from("kafka:test*?brokers=localhost:9092&topicIsPattern=true")
    .log("Message received from Kafka : ${body}")
    .log("    on the topic ${headers[kafka.TOPIC]}")
    .log("    on the partition ${headers[kafka.PARTITION]}")
    .log("    with the offset ${headers[kafka.OFFSET]}")
    .log("    with the key ${headers[kafka.KEY]}")

当消耗 Kafka 的消息时,您可以使用自己的偏移管理,而不将此管理委派给 Kafka。为了保持偏移组件的偏移,组件需要 StateRepository 实现,如 FileStateRepository。此 bean 应该在注册表中提供。如何使用它:

// Create the repository in which the Kafka offsets will be persisted
FileStateRepository repository = FileStateRepository.fileStateRepository(new File("/path/to/repo.dat"));

// Bind this repository into the Camel registry
Registry registry = createCamelRegistry();
registry.bind("offsetRepo", repository);

// Configure the camel context
DefaultCamelContext camelContext = new DefaultCamelContext(registry);
camelContext.addRoutes(new RouteBuilder() {
    @Override
    public void configure() throws Exception {
        from("kafka:" + TOPIC + "?brokers=localhost:{{kafkaPort}}" +
                     // Setup the topic and broker address
                     "&groupId=A" +
                     // The consumer processor group ID
                     "&autoOffsetReset=earliest" +
                     // Ask to start from the beginning if we have unknown offset
                     "&offsetRepository=#offsetRepo")
                     // Keep the offsets in the previously configured repository
                .to("mock:result");
    }
});
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.