190.5. サンプル


190.5.1. Kafka からのメッセージの消費

Kafka からメッセージを読み取るために必要な最小限のルートを次に示します。

from("kafka:test?brokers=localhost:9092")
    .log("Message received from Kafka : ${body}")
    .log("    on the topic ${headers[kafka.TOPIC]}")
    .log("    on the partition ${headers[kafka.PARTITION]}")
    .log("    with the offset ${headers[kafka.OFFSET]}")
    .log("    with the key ${headers[kafka.KEY]}")

複数のトピックからのメッセージを消費する必要がある場合は、トピック名のコンマ区切りリストを使用できます

from("kafka:test,test1,test2?brokers=localhost:9092")
    .log("Message received from Kafka : ${body}")
    .log("    on the topic ${headers[kafka.TOPIC]}")
    .log("    on the partition ${headers[kafka.PARTITION]}")
    .log("    with the offset ${headers[kafka.OFFSET]}")
    .log("    with the key ${headers[kafka.KEY]}")

Kafka からのメッセージを消費する場合、独自のオフセット管理を使用でき、この管理を Kafka に委任する必要はありません。オフセットを保持するために、コンポーネントには FileStateRepository などの StateRepository 実装が必要です。この Bean は、レジストリーで使用できるはずです。ここでそれを使用する方法:

// Create the repository in which the Kafka offsets will be persisted
FileStateRepository repository = FileStateRepository.fileStateRepository(new File("/path/to/repo.dat"));

// Bind this repository into the Camel registry
JndiRegistry registry = new JndiRegistry();
registry.bind("offsetRepo", repository);

// Configure the camel context
DefaultCamelContext camelContext = new DefaultCamelContext(registry);
camelContext.addRoutes(new RouteBuilder() {
    @Override
    public void configure() throws Exception {
        from("kafka:" + TOPIC + "?brokers=localhost:{{kafkaPort}}" +
                     // Setup the topic and broker address
                     "&groupId=A" +
                     // The consumer processor group ID
                     "&autoOffsetReset=earliest" +
                     // Ask to start from the beginning if we have unknown offset
                     "&offsetRepository=#offsetRepo")
                     // Keep the offsets in the previously configured repository
                .to("mock:result");
    }
});

 

190.5.2. Kafka へのメッセージの生成

Kafka にメッセージを書き込むために必要な最小限のルートを次に示します。

from("direct:start")
    .setBody(constant("Message from Camel"))          // Message to send
    .setHeader(KafkaConstants.KEY, constant("Camel")) // Key of the message
    .to("kafka:test?brokers=localhost:9092");
Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.