11.8. Idempotent Consumer
概要
Idempotent Consumer パターンは、重複したメッセージをフィルターするために使用され ます。たとえば、システム障害により、メッセージングシステムとコンシューマーエンドポイント間の接続が突然失われるシナリオについて考えてみましょう。メッセージングシステムがメッセージの送信中だった場合は、コンシューマーが最後のメッセージを受け取ったかどうかが不明な可能性があります。配信の信頼性を向上させるために、メッセージングシステムは、接続が再確立され次第、メッセージを再配信することを決定する場合があります。ただし、これにより、コンシューマーが重複したメッセージを受信する可能性があり、場合によっては、メッセージの重複により、望ましくない結果 (口座から 2 回お金が引き落とされるなど) となる可能性があります。このシナリオでは、メッセージストリームから不必要な重複を取り除くために、べき等コンシューマーを使用します。
Camel は以下のべき等コンシューマー実装を提供します。
インメモリーキャッシュを持つべき等コンシューマー
Apache Camel では、Idempotent Consumer パターンが idempotentConsumer()
プロセッサーによって実装されます。これには、以下の 2 つの引数を持ちます。
-
messageIdExpression
: 現在のメッセージのメッセージ ID 文字列を返す式。 -
messageIdRepository
: 受信したすべてのメッセージの ID を格納するメッセージ ID リポジトリーへの参照。
各メッセージを受信すると、べき等コンシューマープロセッサーは、リポジトリーの現在のメッセージ ID を検索し、このメッセージが以前にあったものかを確認します。yes の場合、メッセージは破棄されます。no の場合、メッセージは渡され、その ID がリポジトリーに追加されます。
例11.1「インメモリーキャッシュを使用した重複メッセージのフィルタリング」 に表示されているコードは、TransactionID
ヘッダーを使用して重複をフィルタリングします。
例11.1 インメモリーキャッシュを使用した重複メッセージのフィルタリング
import static org.apache.camel.processor.idempotent.MemoryMessageIdRepository.memoryMessageIdRepository; ... RouteBuilder builder = new RouteBuilder() { public void configure() { from("seda:a") .idempotentConsumer( header("TransactionID"), memoryMessageIdRepository(200) ).to("seda:b"); } };
memoryMessageIdRepository(200)
が呼び出されると、最大 200 個のメッセージ ID を保持できるインメモリーキャッシュが作成されます。
XML 設定を使用してべき等コンシューマーを定義することもできます。たとえば、以下のように XML で前述のルートを定義できます。
<camelContext id="buildIdempotentConsumer" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="seda:a"/> <idempotentConsumer messageIdRepositoryRef="MsgIDRepos"> <simple>header.TransactionID</simple> <to uri="seda:b"/> </idempotentConsumer> </route> </camelContext> <bean id="MsgIDRepos" class="org.apache.camel.processor.idempotent.MemoryMessageIdRepository"> <!-- Specify the in-memory cache size. --> <constructor-arg type="int" value="200"/> </bean>
Camel 2.17 より、べき等リポジトリーはオプションのシリアライズされたヘッダーをサポートします。
JPA リポジトリーを使用したべき等コンシューマー
インメモリーキャッシュは、メモリー不足になりやすく、クラスター環境で機能しないという欠点があります。これらの欠点に対処するために、代わりに Java Persistent API (JPA) ベースのリポジトリーを使用できます。JPA メッセージ ID リポジトリーは、オブジェクト指向のデータベースを使用してメッセージ ID を保存します。たとえば、以下のように、べき等コンシューマーの JPA リポジトリーを使用するルートを定義できます。
import org.springframework.orm.jpa.JpaTemplate; import org.apache.camel.spring.SpringRouteBuilder; import static org.apache.camel.processor.idempotent.jpa.JpaMessageIdRepository.jpaMessageIdRepository; ... RouteBuilder builder = new SpringRouteBuilder() { public void configure() { from("seda:a").idempotentConsumer( header("TransactionID"), jpaMessageIdRepository(bean(JpaTemplate.class), "myProcessorName") ).to("seda:b"); } };
JPA メッセージ ID リポジトリーは 2 つの引数で初期化されます。
-
JpaTemplate
インスタンス: JPA データベースのハンドルを指定します。 - プロセッサー名: 現在のべき等コンシューマープロセッサーを特定します。
SpringRouteBuilder.bean()
メソッドは、Spring XML ファイルで定義された Bean を参照するショートカットです。JpaTemplate
Bean は、基礎となる JPA データベースに対するハンドルを提供します。この Bean の設定方法に関する詳細は、JPA のドキュメントを参照してください。
JPA リポジトリーの設定に関する詳細は、JPA Component ドキュメント、Spring JPA ドキュメント、および「Camel JPA unit test 」を参照してください。
Spring XML の例
以下の例では、myMessageId
ヘッダーを使用して重複をフィルターします。
<!-- repository for the idempotent consumer --> <bean id="myRepo" class="org.apache.camel.processor.idempotent.MemoryIdempotentRepository"/> <camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <idempotentConsumer messageIdRepositoryRef="myRepo"> <!-- use the messageId header as key for identifying duplicate messages --> <header>messageId</header> <!-- if not a duplicate send it to this mock endpoint --> <to uri="mock:result"/> </idempotentConsumer> </route> </camelContext>
JDBC リポジトリーを使用したべき等コンシューマー
JDBC リポジトリーは、Idemopotent Consumer パターンのメッセージ ID の格納でもサポートされます。JDBC リポジトリーの実装は SQL コンポーネントによって提供されるので、Maven ビルドシステムを使用している場合は、camel-sql
アーティファクトに依存関係を追加します。
SQL データベースへの接続をインスタンス化するために、Spring persistence API から SingleConnectionDataSource
JDBC ラッパークラスを使用できます。たとえば、HyperSQL データベースインスタンスへの JDBC 接続をインスタンス化するには、以下の JDBC データソースを定義できます。
<bean id="dataSource" class="org.springframework.jdbc.datasource.SingleConnectionDataSource"> <property name="driverClassName" value="org.hsqldb.jdbcDriver"/> <property name="url" value="jdbc:hsqldb:mem:camel_jdbc"/> <property name="username" value="sa"/> <property name="password" value=""/> </bean>
前述の JDBC データソースは HyperSQL mem
プロトコルを使用し、メモリーのみのデータベースインスタンスを作成します。これは、HyperSQL データベースの簡易実装で、永続的では ありません。
前述のデータソースを使用して、JDBC メッセージ ID リポジトリーを使用する Idempotent Consumer パターンを以下のように定義できます。
<bean id="messageIdRepository" class="org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository"> <constructor-arg ref="dataSource" /> <constructor-arg value="myProcessorName" /> </bean> <camel:camelContext> <camel:errorHandler id="deadLetterChannel" type="DeadLetterChannel" deadLetterUri="mock:error"> <camel:redeliveryPolicy maximumRedeliveries="0" maximumRedeliveryDelay="0" logStackTrace="false" /> </camel:errorHandler> <camel:route id="JdbcMessageIdRepositoryTest" errorHandlerRef="deadLetterChannel"> <camel:from uri="direct:start" /> <camel:idempotentConsumer messageIdRepositoryRef="messageIdRepository"> <camel:header>messageId</camel:header> <camel:to uri="mock:result" /> </camel:idempotentConsumer> </camel:route> </camel:camelContext>
ルートで重複メッセージを処理する方法
Camel 2.8 から利用可能
skipDuplicate
オプションを false
に設定して、べき等コンシューマーに重複したメッセージもルーティングするように指示できるようになりました。ただし、「エクスチェンジ」 のプロパティーを true に設定して、複製メッセージが重複としてマークされています。「Content-Based Router」 または 「Message Filter」 を使用して重複メッセージを検出し、処理することで、この事実を活用することができます。
以下の例では、「Message Filter」 を使用してメッセージを重複エンドポイントに送信し、メッセージのルーティングを停止します。
from("direct:start") // instruct idempotent consumer to not skip duplicates as we will filter then our self .idempotentConsumer(header("messageId")).messageIdRepository(repo).skipDuplicate(false) .filter(property(Exchange.DUPLICATE_MESSAGE).isEqualTo(true)) // filter out duplicate messages by sending them to someplace else and then stop .to("mock:duplicate") .stop() .end() // and here we process only new messages (no duplicates) .to("mock:result");
XML DSL の例を以下に示します。
<!-- idempotent repository, just use a memory based for testing --> <bean id="myRepo" class="org.apache.camel.processor.idempotent.MemoryIdempotentRepository"/> <camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <!-- we do not want to skip any duplicate messages --> <idempotentConsumer messageIdRepositoryRef="myRepo" skipDuplicate="false"> <!-- use the messageId header as key for identifying duplicate messages --> <header>messageId</header> <!-- we will to handle duplicate messages using a filter --> <filter> <!-- the filter will only react on duplicate messages, if this property is set on the Exchange --> <property>CamelDuplicateMessage</property> <!-- and send the message to this mock, due its part of an unit test --> <!-- but you can of course do anything as its part of the route --> <to uri="mock:duplicate"/> <!-- and then stop --> <stop/> </filter> <!-- here we route only new messages --> <to uri="mock:result"/> </idempotentConsumer> </route> </camelContext>
データグリッドを使用したクラスター環境で重複メッセージを処理する方法
クラスター環境で Camel を実行している場合、メモリーのべき等リポジトリーでは機能しません (上記を参照)。中央のデータベースを設定するか、Hazelcast データ グリッドを基にしたべき等コンシューマー実装を使用できます。Hazelcast は、マルチキャスト経由のノード (デフォルトは tcp-ip の Hazelcast を設定) を見つけ、マップベースのリポジトリーを自動的に作成します。
HazelcastIdempotentRepository idempotentRepo = new HazelcastIdempotentRepository("myrepo"); from("direct:in").idempotentConsumer(header("messageId"), idempotentRepo).to("mock:out");
各メッセージ ID を保持するリポジトリーの期間を定義する必要があります (デフォルトは削除しません)。メモリーが不足しないようにするには、Hazelcast 設定に基づいてエビクションストラテジーを作成する必要が あります。詳細は「 Hazelcast 」を参照してください。
http://camel.apache.org/hazelcast-idempotent-repository-tutorial.html[Idempotent Repository tutorial] を参照してください。
Apache Karaf を使用して、2 つのクラスターノードにべき等リポジトリーを設定する方法について詳しく説明します。
オプション
Idempotent Consumer には以下のオプションがあります。
オプション | デフォルト | 説明 |
|
| Camel 2.0: Eager は、エクスチェンジの処理前後に Camel がメッセージをリポジトリーに追加するかどうかを制御します。これを事前に有効にしておくと、メッセージが現在進行中であっても、Camel は重複メッセージを検出できます。無効にすると、Camel はメッセージが正常に処理されたときにのみ重複を検出します。 |
|
|
レジストリーで検索するための |
|
|
camel 2.8: 重複メッセージをスキップするかどうかを設定します。 |
|
| camel 2.16: エクスチェンジが完了すると、Idempotent コンシューマーの Eager を完了するかどうかを設定します。
|