191.7. Kafka べき等リポジトリーの使用
Camel 2.19 から利用可能
camel-kafka
ライブラリーは、Kafka トピックベースのべき等リポジトリーを提供します。このリポジトリーは、べき等状態 (追加/削除) へのブロードキャストのすべての変更を Kafka トピックに保存し、イベントソーシングを通じて各リポジトリーのプロセスインスタンスのローカルインメモリーキャッシュにデータを取り込みます。
使用されるトピックは、べき等リポジトリーインスタンスごとに一意である必要があります。このメカニズムには、トピックパーティションの数に関する要件はありません。リポジトリーがすべてのパーティションから同時に消費するためです。また、トピックのレプリケーションファクターに関する要件もありません。
トピックを使用する各リポジトリーインスタンス (通常、並行して実行されている異なるマシン上) は独自の consumer グループを制御するため、同じトピックを使用する 10 個の Camel プロセスのクラスターでは、それぞれが独自のオフセットを制御します。
起動時に、インスタンスはトピックにサブスクライブし、オフセットを先頭に巻き戻し、キャッシュを最新の状態に再構築します。pollDurationMs
の長さの 1 つのポーリングで 0 レコードが返されるまで、キャッシュはウォームアップされたと見なされません。キャッシュがウォームアップするか、30 秒経過するまで、起動は完了しません。後者の場合、コンシューマーがトピックの最後に追いつくまで、冪等リポジトリーは一貫性のない状態になる可能性があります。
KafkaIdempotentRepository
には次のプロパティーがあります。
プロパティー | 説明 |
---|---|
| 変更をブロードキャストするために使用する Kafka トピックの名前。(必要) |
|
内部 Kafka producer および consumer の |
|
変更をブロードキャストする Kafka producer によって使用されるプロパティーを設定します。 |
|
トピックからキャッシュを設定する Kafka consumer によって使用されるプロパティーを設定します。 |
| 最近使用したキーをメモリーに保存する数 (デフォルトは 1000)。 |
|
Kafka consumer のポーリング期間。ローカルキャッシュはすぐに更新されます。この値は、トピックからキャッシュを更新する他のピアが、キャッシュアクションメッセージを送信した冪等の consumer インスタンスと比べてどれだけ遅れているかに影響します。このデフォルト値は 100 ミリ秒です。 |
topic
と bootstrapServers
を定義してリポジトリーをインスタンス化するか、producerConfig
および consumerConfig
プロパティーセットを明示的に定義して、SSL/SASL などの機能を有効にすることができます。
使用するには、CamelContext
に対応しているため、手動で、または Spring/Blueprint で Bean として登録することにより、このリポジトリーを Camel レジストリーに配置する必要があります。
使用例は次のとおりです。
KafkaIdempotentRepository kafkaIdempotentRepository = new KafkaIdempotentRepository("idempotent-db-inserts", "localhost:9091"); SimpleRegistry registry = new SimpleRegistry(); registry.put("insertDbIdemRepo", kafkaIdempotentRepository); // must be registered in the registry, to enable access to the CamelContext CamelContext context = new CamelContext(registry); // later in RouteBuilder... from("direct:performInsert") .idempotentConsumer(header("id")).messageIdRepositoryRef("insertDbIdemRepo") // once-only insert into database .end()
XML の場合:
<!-- simple --> <bean id="insertDbIdemRepo" class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository"> <property name="topic" value="idempotent-db-inserts"/> <property name="bootstrapServers" value="localhost:9091"/> </bean> <!-- complex --> <bean id="insertDbIdemRepo" class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository"> <property name="topic" value="idempotent-db-inserts"/> <property name="maxCacheSize" value="10000"/> <property name="consumerConfig"> <props> <prop key="bootstrap.servers">localhost:9091</prop> </props> </property> <property name="producerConfig"> <props> <prop key="bootstrap.servers">localhost:9091</prop> </props> </property> </bean>