11.8. Idempotent Consumer
概要 リンクのコピーリンクがクリップボードにコピーされました!
Idempotent Consumer パターンは、重複したメッセージをフィルターするために使用され ます。たとえば、システム障害により、メッセージングシステムとコンシューマーエンドポイント間の接続が突然失われるシナリオについて考えてみましょう。メッセージングシステムがメッセージの送信中だった場合は、コンシューマーが最後のメッセージを受け取ったかどうかが不明な可能性があります。配信の信頼性を向上させるために、メッセージングシステムは、接続が再確立され次第、メッセージを再配信することを決定する場合があります。ただし、これにより、コンシューマーが重複したメッセージを受信する可能性があり、場合によっては、メッセージの重複により、望ましくない結果 (口座から 2 回お金が引き落とされるなど) となる可能性があります。このシナリオでは、メッセージストリームから不必要な重複を取り除くために、べき等コンシューマーを使用します。
Camel は以下のべき等コンシューマー実装を提供します。
-
MemoryIdempotentRepository
- KafkaIdempotentRepository
- File
- Hazelcast
- SQL
- JPA
インメモリーキャッシュを持つべき等コンシューマー リンクのコピーリンクがクリップボードにコピーされました!
Apache Camel では、Idempotent Consumer パターンが idempotentConsumer()
プロセッサーによって実装されます。これには、以下の 2 つの引数を持ちます。
-
messageIdExpression
: 現在のメッセージのメッセージ ID 文字列を返す式。 -
messageIdRepository
: 受信したすべてのメッセージの ID を格納するメッセージ ID リポジトリーへの参照。
各メッセージを受信すると、べき等コンシューマープロセッサーは、リポジトリーの現在のメッセージ ID を検索し、このメッセージが以前にあったものかを確認します。yes の場合、メッセージは破棄されます。no の場合、メッセージは渡され、その ID がリポジトリーに追加されます。
例11.1「インメモリーキャッシュを使用した重複メッセージのフィルタリング」 に表示されているコードは、TransactionID
ヘッダーを使用して重複をフィルタリングします。
例11.1 インメモリーキャッシュを使用した重複メッセージのフィルタリング
memoryMessageIdRepository(200)
が呼び出されると、最大 200 個のメッセージ ID を保持できるインメモリーキャッシュが作成されます。
XML 設定を使用してべき等コンシューマーを定義することもできます。たとえば、以下のように XML で前述のルートを定義できます。
Camel 2.17 より、べき等リポジトリーはオプションのシリアライズされたヘッダーをサポートします。
JPA リポジトリーを使用したべき等コンシューマー リンクのコピーリンクがクリップボードにコピーされました!
インメモリーキャッシュは、メモリー不足になりやすく、クラスター環境で機能しないという欠点があります。これらの欠点に対処するために、代わりに Java Persistent API (JPA) ベースのリポジトリーを使用できます。JPA メッセージ ID リポジトリーは、オブジェクト指向のデータベースを使用してメッセージ ID を保存します。たとえば、以下のように、べき等コンシューマーの JPA リポジトリーを使用するルートを定義できます。
JPA メッセージ ID リポジトリーは 2 つの引数で初期化されます。
-
JpaTemplate
インスタンス: JPA データベースのハンドルを指定します。 - プロセッサー名: 現在のべき等コンシューマープロセッサーを特定します。
SpringRouteBuilder.bean()
メソッドは、Spring XML ファイルで定義された Bean を参照するショートカットです。JpaTemplate
Bean は、基礎となる JPA データベースに対するハンドルを提供します。この Bean の設定方法に関する詳細は、JPA のドキュメントを参照してください。
JPA リポジトリーの設定に関する詳細は、JPA Component ドキュメント、Spring JPA ドキュメント、および Camel JPA unit test を参照してください。
Spring XML の例 リンクのコピーリンクがクリップボードにコピーされました!
以下の例では、myMessageId
ヘッダーを使用して重複をフィルターします。
JDBC リポジトリーを使用したべき等コンシューマー リンクのコピーリンクがクリップボードにコピーされました!
JDBC リポジトリーは、Idemopotent Consumer パターンのメッセージ ID の格納でもサポートされます。JDBC リポジトリーの実装は SQL コンポーネントによって提供されるので、Maven ビルドシステムを使用している場合は、camel-sql
アーティファクトに依存関係を追加します。
SQL データベースへの接続をインスタンス化するために、Spring persistence API から SingleConnectionDataSource
JDBC ラッパークラスを使用できます。たとえば、HyperSQL データベースインスタンスへの JDBC 接続をインスタンス化するには、以下の JDBC データソースを定義できます。
前述の JDBC データソースは HyperSQL mem
プロトコルを使用し、メモリーのみのデータベースインスタンスを作成します。これは、HyperSQL データベースの簡易実装で、永続的では ありません。
前述のデータソースを使用して、JDBC メッセージ ID リポジトリーを使用する Idempotent Consumer パターンを以下のように定義できます。
ルートで重複メッセージを処理する方法 リンクのコピーリンクがクリップボードにコピーされました!
Camel 2.8 から利用可能
skipDuplicate
オプションを false
に設定して、べき等コンシューマーに重複したメッセージもルーティングするように指示できるようになりました。ただし、「エクスチェンジ」 プロパティーを true に設定することにより、重複メッセージが duplicate とマークされます。重複メッセージを検出し、処理するために 「Content-Based Router」 または 「Message Filter」 が使用されます。
以下の例では、メッセージを重複エンドポイントに送信するために 「Message Filter」 を使用し、メッセージのルーティングを停止します。
XML DSL の例を以下に示します。
データグリッドを使用したクラスター環境で重複メッセージを処理する方法 リンクのコピーリンクがクリップボードにコピーされました!
クラスター環境で Camel を実行している場合、メモリーのべき等リポジトリーでは機能しません (上記を参照)。中央のデータベースを設定するか、Hazelcast データ グリッドを基にしたべき等コンシューマー実装を使用できます。Hazelcast は、マルチキャスト経由のノード (デフォルトは tcp-ip の Hazelcast を設定) を見つけ、マップベースのリポジトリーを自動的に作成します。
HazelcastIdempotentRepository idempotentRepo = new HazelcastIdempotentRepository("myrepo"); from("direct:in").idempotentConsumer(header("messageId"), idempotentRepo).to("mock:out");
HazelcastIdempotentRepository idempotentRepo = new HazelcastIdempotentRepository("myrepo");
from("direct:in").idempotentConsumer(header("messageId"), idempotentRepo).to("mock:out");
各メッセージ ID を保持するリポジトリーの期間を定義する必要があります (デフォルトは削除しません)。メモリーが不足しないようにするには、Hazelcast 設定 に基づいてエビクションストラテジーを作成する必要が あります。詳細は、Hazelcast を参照してください。
http://camel.apache.org/hazelcast-idempotent-repository-tutorial.html[Idempotent Repository tutorial] を参照してください。
Apache Karaf を使用して、2 つのクラスターノードにべき等リポジトリーを設定する方法について詳しく説明します。
オプション リンクのコピーリンクがクリップボードにコピーされました!
Idempotent Consumer には以下のオプションがあります。
オプション | デフォルト | 説明 |
|
| Camel 2.0: Eager は、エクスチェンジの処理前後に Camel がメッセージをリポジトリーに追加するかどうかを制御します。これを事前に有効にしておくと、メッセージが現在進行中であっても、Camel は重複メッセージを検出できます。無効にすると、Camel はメッセージが正常に処理されたときにのみ重複を検出します。 |
|
|
レジストリーで検索するための |
|
|
camel 2.8: 重複メッセージをスキップするかどうかを設定します。 |
|
| camel 2.16: エクスチェンジが完了すると、Idempotent コンシューマーの Eager を完了するかどうかを設定します。
|