181.6. Kafka のべき等リポジトリーの使用
Camel 2.19 から利用可能
camel-kafka ライブラリーは、Kafka トピックベースのべき等リポジトリーを提供します。このリポジトリーは、Kafka トピックでべき等状態(追加/削除)へのすべての変更をブロードキャストし、イベントソーシングによって各リポジトリーのプロセスインスタンスのローカルインメモリーキャッシュを生成します。
使用するトピックは、べき等リポジトリーインスタンスごとに一意である必要があります。このメカニズムには、トピックパーティションの数の要件はありません。リポジトリーはすべてのパーティションから同時に消費するためです。また、トピックのレプリケーション係数に関する要件はありません。
トピックを使用する各リポジトリーインスタンス(例: 通常、並列で実行されている異なるマシン上)は独自のコンシューマーグループを制御します。そのため、同じトピックを使用して 10 個の Camel プロセスのクラスターで、それぞれ独自のオフセットを制御します。
起動時に、インスタンスはトピックをサブスクライブし、オフセットを最初から戻し、キャッシュを最新の状態に再ビルドします。長さで poll DurationM の 1 回のポーリング が 0 レコードを返すまで、キャッシュは保留状態とみなされます。キャッシュが準備されるまで起動は完了しません。または 30 秒後に発生した場合は、コンシューマーがトピックの最後に追いつくまで、べき等リポジトリーが一貫性のない状態になる可能性があります。
KafkaIdempotentRepository には以下のプロパティーがあります。
| プロパティー | 説明 |
|---|---|
|
| 変更のブロードキャストに使用する Kafka トピックの名前(必須)。 |
|
|
内部 Kafka プロデューサーおよびコンシューマーの |
|
|
変更をブロードキャストする Kafka プロデューサーによって使用されるプロパティーを設定します。 |
|
|
トピックからキャッシュを設定する Kafka コンシューマーによって使用されるプロパティーを設定します。 |
|
| メモリーに保存される、最近使用されたキーの数(デフォルトは 1000)。 |
|
|
Kafka コンシューマーのポーリング期間。ローカルキャッシュは即座に更新されます。この値は、トピックからキャッシュを更新する他のピアが、キャッシュアクションメッセージを送信したべき等コンシューマーインスタンスとの関連に影響します。デフォルト値は 100 ミリ秒です。 |
リポジトリーは topic および bootstrapServers を定義してインスタンス化できます。または、producerConfig および consumerConfig プロパティーセットを明示的に定義し、SSL/SASL などの機能を有効にします。
使用するには、手動または Spring/Blueprint で Bean として登録するか、CamelContext を認識しているので、このリポジトリーを Camel レジストリーに置く必要があります。
使用例は次のとおりです。
KafkaIdempotentRepository kafkaIdempotentRepository = new KafkaIdempotentRepository("idempotent-db-inserts", "localhost:9091");
SimpleRegistry registry = new SimpleRegistry();
registry.put("insertDbIdemRepo", kafkaIdempotentRepository); // must be registered in the registry, to enable access to the CamelContext
CamelContext context = new CamelContext(registry);
// later in RouteBuilder...
from("direct:performInsert")
.idempotentConsumer(header("id")).messageIdRepositoryRef("insertDbIdemRepo")
// once-only insert into database
.end()
XML の場合:
<!-- simple -->
<bean id="insertDbIdemRepo" class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
<property name="topic" value="idempotent-db-inserts"/>
<property name="bootstrapServers" value="localhost:9091"/>
</bean>
<!-- complex -->
<bean id="insertDbIdemRepo" class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
<property name="topic" value="idempotent-db-inserts"/>
<property name="maxCacheSize" value="10000"/>
<property name="consumerConfig">
<props>
<prop key="bootstrap.servers">localhost:9091</prop>
</props>
</property>
<property name="producerConfig">
<props>
<prop key="bootstrap.servers">localhost:9091</prop>
</props>
</property>
</bean>