27.9. Kafka べき等リポジトリーの使用
camel-kafka ライブラリーは、Kafka トピックベースのべき等リポジトリーを提供します。
このリポジトリーは、べき等状態 (追加/削除) へのブロードキャストのすべての変更を Kafka トピックに保存し、イベントソーシングを通じて各リポジトリーのプロセスインスタンスのローカルインメモリーキャッシュにデータを取り込みます。使用されるトピックは、べき等リポジトリーインスタンスごとに一意である必要があります。
このメカニズムには、トピックパーティションの数に関する要件はありません。リポジトリーがすべてのパーティションから同時に消費するためです。また、トピックのレプリケーションファクターに関する要件もありません。
トピックを使用する各リポジトリーインスタンス (通常、並行して実行されている異なるマシン上) は独自の consumer グループを制御するため、同じトピックを使用する 10 個の Camel プロセスのクラスターでは、それぞれが独自のオフセットを制御します。
起動時に、インスタンスはトピックにサブスクライブし、オフセットを先頭に巻き戻し、キャッシュを最新の状態に再構築します。pollDurationMs の長さの 1 つのポーリングで 0 レコードが返されるまで、キャッシュはウォームアップされたと見なされません。キャッシュがウォームアップするか、30 秒経過するまで、起動は完了しません。後者の場合、consumer がトピックの最後に追いつくまで、冪等リポジトリーは一貫性のない状態になる可能性があります。
一意性チェックに使用されるヘッダーの形式に注意してください。デフォルトでは、文字列をデータ型として使用します。プリミティブな数値形式を使用する場合、それに応じてヘッダーを逆シリアル化する必要があります。例については、以下のサンプルを確認してください。
KafkaIdempotentRepository には次のプロパティーがあります。
| プロパティー | 説明 |
|---|---|
|
| 変更をブロードキャストするために使用する Kafka トピックの名前。(必要) |
|
|
内部 Kafka producer および consumer の |
|
|
変更をブロードキャストする Kafka producer によって使用されるプロパティーを設定します。 |
|
|
トピックからキャッシュを設定する Kafka consumer によって使用されるプロパティーを設定します。 |
|
| 最近使用したキーをメモリーに保存する数 (デフォルトは 1000)。 |
|
|
Kafka consumer のポーリング期間。ローカルキャッシュはすぐに更新されます。この値は、トピックからキャッシュを更新する他のピアが、キャッシュアクションメッセージを送信した冪等の consumer インスタンスと比べてどれだけ遅れているかに影響します。このデフォルト値は 100 ミリ秒です。 |
topic と bootstrapServers を定義してリポジトリーをインスタンス化するか、または producerConfig および consumerConfig プロパティーセットを明示的に定義して、SSL/SASL などの機能を有効にすることができます。使用するには、CamelContext に対応しているため、手動で、または Spring/Blueprint で Bean として登録することにより、このリポジトリーを Camel レジストリーに配置する必要があります。
使用例は次のとおりです。
KafkaIdempotentRepository kafkaIdempotentRepository = new KafkaIdempotentRepository("idempotent-db-inserts", "localhost:9091");
SimpleRegistry registry = new SimpleRegistry();
registry.put("insertDbIdemRepo", kafkaIdempotentRepository); // must be registered in the registry, to enable access to the CamelContext
CamelContext context = new CamelContext(registry);
// later in RouteBuilder...
from("direct:performInsert")
.idempotentConsumer(header("id")).messageIdRepositoryRef("insertDbIdemRepo")
// once-only insert into database
.end()
XML の場合:
<!-- simple -->
<bean id="insertDbIdemRepo"
class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
<property name="topic" value="idempotent-db-inserts"/>
<property name="bootstrapServers" value="localhost:9091"/>
</bean>
<!-- complex -->
<bean id="insertDbIdemRepo"
class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
<property name="topic" value="idempotent-db-inserts"/>
<property name="maxCacheSize" value="10000"/>
<property name="consumerConfig">
<props>
<prop key="bootstrap.servers">localhost:9091</prop>
</props>
</property>
<property name="producerConfig">
<props>
<prop key="bootstrap.servers">localhost:9091</prop>
</props>
</property>
</bean>
数値識別子でべき等性を使用する場合は、3 つの選択肢から選択できます。1 つ目は、org.apache.camel.component.kafka.serde.KafkaSerdeHelper の静的メソッド numericHeader メソッドを使用して変換を実行することです。
from("direct:performInsert")
.idempotentConsumer(numericHeader("id")).messageIdRepositoryRef("insertDbIdemRepo")
// once-only insert into database
.end()
または、ルート URL を介して設定されたカスタムシリアライザーを使用して変換を実行することもできます。
public class CustomHeaderDeserializer extends DefaultKafkaHeaderDeserializer {
private static final Logger LOG = LoggerFactory.getLogger(CustomHeaderDeserializer.class);
@Override
public Object deserialize(String key, byte[] value) {
if (key.equals("id")) {
BigInteger bi = new BigInteger(value);
return String.valueOf(bi.longValue());
} else {
return super.deserialize(key, value);
}
}
}
最後に、プロセッサーでこれを行うこともできます。
from(from).routeId("foo")
.process(exchange -> {
byte[] id = exchange.getIn().getHeader("id", byte[].class);
BigInteger bi = new BigInteger(id);
exchange.getIn().setHeader("id", String.valueOf(bi.longValue()));
})
.idempotentConsumer(header("id"))
.messageIdRepositoryRef("kafkaIdempotentRepository")
.to(to);