181.6. Kafka のべき等リポジトリーの使用


Camel 2.19 から利用可能

camel-kafka ライブラリーは、Kafka トピックベースのべき等リポジトリーを提供します。このリポジトリーは、Kafka トピックでべき等状態(追加/削除)へのすべての変更をブロードキャストし、イベントソーシングによって各リポジトリーのプロセスインスタンスのローカルインメモリーキャッシュを生成します。

使用するトピックは、べき等リポジトリーインスタンスごとに一意である必要があります。このメカニズムには、トピックパーティションの数の要件はありません。リポジトリーはすべてのパーティションから同時に消費するためです。また、トピックのレプリケーション係数に関する要件はありません。

トピックを使用する各リポジトリーインスタンス(例: 通常、並列で実行されている異なるマシン上)は独自のコンシューマーグループを制御します。そのため、同じトピックを使用して 10 個の Camel プロセスのクラスターで、それぞれ独自のオフセットを制御します。

起動時に、インスタンスはトピックをサブスクライブし、オフセットを最初から戻し、キャッシュを最新の状態に再ビルドします。長さで poll DurationM の 1 回のポーリング が 0 レコードを返すまで、キャッシュは保留状態とみなされます。キャッシュが準備されるまで起動は完了しません。または 30 秒後に発生した場合は、コンシューマーがトピックの最後に追いつくまで、べき等リポジトリーが一貫性のない状態になる可能性があります。

KafkaIdempotentRepository には以下のプロパティーがあります。

Expand
プロパティー説明

topic

変更のブロードキャストに使用する Kafka トピックの名前(必須)。

bootstrapServers

内部 Kafka プロデューサーおよびコンシューマーの bootstrap.servers プロパティー。consumerConfig および producerConfig が設定されていない場合は、これを短縮として使用します。これを使用すると、このコンポーネントはプロデューサーとコンシューマーに適切なデフォルト設定を適用します。

producerConfig

変更をブロードキャストする Kafka プロデューサーによって使用されるプロパティーを設定します。bootstrapServers を上書きするため、Kafka bootstrap.servers プロパティー自体を定義する必要があります。

consumerConfig

トピックからキャッシュを設定する Kafka コンシューマーによって使用されるプロパティーを設定します。bootstrapServers を上書きするため、Kafka bootstrap.servers プロパティー自体を定義する必要があります。

maxCacheSize

メモリーに保存される、最近使用されたキーの数(デフォルトは 1000)。

pollDurationMs

Kafka コンシューマーのポーリング期間。ローカルキャッシュは即座に更新されます。この値は、トピックからキャッシュを更新する他のピアが、キャッシュアクションメッセージを送信したべき等コンシューマーインスタンスとの関連に影響します。デフォルト値は 100 ミリ秒です。
この値を明示的に設定する場合、リモートキャッシュの liveness と、このリポジトリーのコンシューマーと Kafka ブローカー間のネットワークトラフィックのボリュームにはトレードオフがあることに注意してください。キャッシュウォームアッププロセスは、何も取得しないポーリングが 1 つ存在します。これは、ストリームが現在のポイントまで消費されたことを示します。トピックでメッセージが送信されるレートにポーリング期間が過剰に長くなると、キャッシュが準備できず、追いつくまでそのピアに対して一貫性のない状態で動作する可能性があります。

リポジトリーは topic および bootstrapServers を定義してインスタンス化できます。または、producerConfig および consumerConfig プロパティーセットを明示的に定義し、SSL/SASL などの機能を有効にします。

使用するには、手動または Spring/Blueprint で Bean として登録するか、CamelContext を認識しているので、このリポジトリーを Camel レジストリーに置く必要があります。

使用例は次のとおりです。

KafkaIdempotentRepository kafkaIdempotentRepository = new KafkaIdempotentRepository("idempotent-db-inserts", "localhost:9091");

SimpleRegistry registry = new SimpleRegistry();
registry.put("insertDbIdemRepo", kafkaIdempotentRepository); // must be registered in the registry, to enable access to the CamelContext
CamelContext context = new CamelContext(registry);

// later in RouteBuilder...
from("direct:performInsert")
    .idempotentConsumer(header("id")).messageIdRepositoryRef("insertDbIdemRepo")
        // once-only insert into database
    .end()

XML の場合:

<!-- simple -->
<bean id="insertDbIdemRepo" class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
  <property name="topic" value="idempotent-db-inserts"/>
  <property name="bootstrapServers" value="localhost:9091"/>
</bean>

<!-- complex -->
<bean id="insertDbIdemRepo" class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
  <property name="topic" value="idempotent-db-inserts"/>
  <property name="maxCacheSize" value="10000"/>
  <property name="consumerConfig">
    <props>
      <prop key="bootstrap.servers">localhost:9091</prop>
    </props>
  </property>
  <property name="producerConfig">
    <props>
      <prop key="bootstrap.servers">localhost:9091</prop>
    </props>
  </property>
</bean>
Red Hat logoGithubredditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。 最新の更新を見る.

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

Theme

© 2026 Red Hat
トップに戻る