54.10. Kafka べき等リポジトリーの使用


camel-kafka ライブラリーは、Kafka トピックベースのべき等リポジトリーを提供します。

このリポジトリーは、べき等状態 (追加/削除) へのブロードキャストのすべての変更を Kafka トピックに保存し、イベントソーシングを通じて各リポジトリーのプロセスインスタンスのローカルインメモリーキャッシュにデータを取り込みます。使用されるトピックは、べき等リポジトリーインスタンスごとに一意である必要があります。

このメカニズムには、トピックパーティションの数に関する要件はありません。リポジトリーがすべてのパーティションから同時に消費するためです。また、トピックのレプリケーションファクターに関する要件もありません。

トピックを使用する各リポジトリーインスタンス (通常、並行して実行されている異なるマシン上) は独自の consumer グループを制御するため、同じトピックを使用する 10 個の Camel プロセスのクラスターでは、それぞれが独自のオフセットを制御します。

起動時に、インスタンスはトピックにサブスクライブし、オフセットを先頭に巻き戻し、キャッシュを最新の状態に再構築します。pollDurationMs の長さの 1 つのポーリングで 0 レコードが返されるまで、キャッシュはウォームアップされたと見なされません。キャッシュがウォームアップするか、30 秒経過するまで、起動は完了しません。後者の場合、consumer がトピックの最後に追いつくまで、冪等リポジトリーは一貫性のない状態になる可能性があります。

一意性チェックに使用されるヘッダーの形式に注意してください。デフォルトでは、文字列をデータ型として使用します。プリミティブな数値形式を使用する場合、それに応じてヘッダーを逆シリアル化する必要があります。例については、以下のサンプルを確認してください。

KafkaIdempotentRepository には次のプロパティーがあります。

Expand
プロパティー説明

topic

変更をブロードキャストするために使用する Kafka トピックの名前。(必要)

bootstrapServers

内部 Kafka producer および consumer の bootstrap.servers プロパティー。consumerConfigproducerConfig を設定しない場合は、これを省略形として使用します。このコンポーネントを使用すると、producer と consumer に適切なデフォルト設定が適用されます。

producerConfig

変更をブロードキャストする Kafka producer によって使用されるプロパティーを設定します。bootstrapServers をオーバーライドするため、Kafka の bootstrap.servers プロパティー自体を定義する必要があります

consumerConfig

トピックからキャッシュを設定する Kafka consumer によって使用されるプロパティーを設定します。bootstrapServers をオーバーライドするため、Kafka の bootstrap.servers プロパティー自体を定義する必要があります

maxCacheSize

最近使用したキーをメモリーに保存する数 (デフォルトは 1000)。

pollDurationMs

Kafka consumer のポーリング期間。ローカルキャッシュはすぐに更新されます。この値は、トピックからキャッシュを更新する他のピアが、キャッシュアクションメッセージを送信した冪等の consumer インスタンスと比べてどれだけ遅れているかに影響します。このデフォルト値は 100 ミリ秒です。
この値を明示的に設定する場合は、リモートキャッシュのライブ性と、このリポジトリーの consumer と Kafka ブローカー間のネットワークトラフィックの量との間にトレードオフがあることに注意してください。キャッシュウォームアッププロセスは、何も取得しない 1 つのポーリングがあることにも依存します。これは、ストリームが現在の時点まで消費されたことを示します。トピックでメッセージが送信される速度に対してポーリング期間が長すぎる場合、キャッシュがウォームアップできず、キャッシュが追いつくまでピアに対して一貫性のない状態で動作する可能性があります。

topicbootstrapServers を定義してリポジトリーをインスタンス化するか、producerConfig および consumerConfig プロパティーセットを明示的に定義して、SSL/SASL などの機能を有効にすることができます。使用するには、CamelContext に対応しているため、手動で、または Spring/Blueprint で Bean として登録することにより、このリポジトリーを Camel レジストリーに配置する必要があります。

使用例は次のとおりです。

KafkaIdempotentRepository kafkaIdempotentRepository = new KafkaIdempotentRepository("idempotent-db-inserts", "localhost:9091");

SimpleRegistry registry = new SimpleRegistry();
registry.put("insertDbIdemRepo", kafkaIdempotentRepository); // must be registered in the registry, to enable access to the CamelContext
CamelContext context = new CamelContext(registry);

// later in RouteBuilder...
from("direct:performInsert")
    .idempotentConsumer(header("id")).messageIdRepositoryRef("insertDbIdemRepo")
        // once-only insert into database
    .end()
Copy to Clipboard Toggle word wrap

XML の場合:

<!-- simple -->
<bean id="insertDbIdemRepo"
  class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
  <property name="topic" value="idempotent-db-inserts"/>
  <property name="bootstrapServers" value="localhost:9091"/>
</bean>

<!-- complex -->
<bean id="insertDbIdemRepo"
  class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
  <property name="topic" value="idempotent-db-inserts"/>
  <property name="maxCacheSize" value="10000"/>
  <property name="consumerConfig">
    <props>
      <prop key="bootstrap.servers">localhost:9091</prop>
    </props>
  </property>
  <property name="producerConfig">
    <props>
      <prop key="bootstrap.servers">localhost:9091</prop>
    </props>
  </property>
</bean>
Copy to Clipboard Toggle word wrap

数値識別子でべき等性を使用する場合は、3 つの選択肢から選択できます。1 つ目は、org.apache.camel.component.kafka.serde.KafkaSerdeHelper の静的メソッド numericHeader メソッドを使用して変換を実行することです。

from("direct:performInsert")
    .idempotentConsumer(numericHeader("id")).messageIdRepositoryRef("insertDbIdemRepo")
        // once-only insert into database
    .end()
Copy to Clipboard Toggle word wrap

または、ルート URL を介して設定されたカスタムシリアライザーを使用して変換を実行することもできます。

public class CustomHeaderDeserializer extends DefaultKafkaHeaderDeserializer {
    private static final Logger LOG = LoggerFactory.getLogger(CustomHeaderDeserializer.class);

    @Override
    public Object deserialize(String key, byte[] value) {
        if (key.equals("id")) {
            BigInteger bi = new BigInteger(value);

            return String.valueOf(bi.longValue());
        } else {
            return super.deserialize(key, value);
        }
    }
}
Copy to Clipboard Toggle word wrap

最後に、プロセッサーでこれを行うこともできます。

from(from).routeId("foo")
    .process(exchange -> {
        byte[] id = exchange.getIn().getHeader("id", byte[].class);

        BigInteger bi = new BigInteger(id);
        exchange.getIn().setHeader("id", String.valueOf(bi.longValue()));
    })
    .idempotentConsumer(header("id"))
    .messageIdRepositoryRef("kafkaIdempotentRepository")
    .to(to);
Copy to Clipboard Toggle word wrap
トップに戻る
Red Hat logoGithubredditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。 最新の更新を見る.

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

Theme

© 2025 Red Hat