Fuse 6 is no longer supported
As of February 2025, Red Hat Fuse 6 is no longer supported. If you are using Fuse 6, please upgrade to Red Hat build of Apache Camel.이 콘텐츠는 선택한 언어로 제공되지 않습니다.
9.8. Idempotent Consumer
Overview 링크 복사링크가 클립보드에 복사되었습니다!
링크 복사링크가 클립보드에 복사되었습니다!
The idempotent consumer pattern is used to filter out duplicate messages. For example, consider a scenario where the connection between a messaging system and a consumer endpoint is abruptly lost due to some fault in the system. If the messaging system was in the middle of transmitting a message, it might be unclear whether or not the consumer received the last message. To improve delivery reliability, the messaging system might decide to redeliver such messages as soon as the connection is re-established. Unfortunately, this entails the risk that the consumer might receive duplicate messages and, in some cases, the effect of duplicating a message may have undesirable consequences (such as debiting a sum of money twice from your account). In this scenario, an idempotent consumer could be used to weed out undesired duplicates from the message stream.
Camel provides the following Idempotent Consumer implementations:
MemoryIdempotentRepository
Idempotent consumer with in-memory cache 링크 복사링크가 클립보드에 복사되었습니다!
링크 복사링크가 클립보드에 복사되었습니다!
In Apache Camel, the idempotent consumer pattern is implemented by the
idempotentConsumer()
processor, which takes two arguments:
messageIdExpression
— An expression that returns a message ID string for the current message.messageIdRepository
— A reference to a message ID repository, which stores the IDs of all the messages received.
As each message comes in, the idempotent consumer processor looks up the current message ID in the repository to see if this message has been seen before. If yes, the message is discarded; if no, the message is allowed to pass and its ID is added to the repository.
The code shown in Example 9.1, “Filtering Duplicate Messages with an In-memory Cache” uses the
TransactionID
header to filter out duplicates.
Example 9.1. Filtering Duplicate Messages with an In-memory Cache
Where the call to
memoryMessageIdRepository(200)
creates an in-memory cache that can hold up to 200 message IDs.
You can also define an idempotent consumer using XML configuration. For example, you can define the preceding route in XML, as follows:
Idempotent consumer with JPA repository 링크 복사링크가 클립보드에 복사되었습니다!
링크 복사링크가 클립보드에 복사되었습니다!
The in-memory cache suffers from the disadvantages of easily running out of memory and not working in a clustered environment. To overcome these disadvantages, you can use a Java Persistent API (JPA) based repository instead. The JPA message ID repository uses an object-oriented database to store the message IDs. For example, you can define a route that uses a JPA repository for the idempotent consumer, as follows:
The JPA message ID repository is initialized with two arguments:
JpaTemplate
instance—Provides the handle for the JPA database.- processor name—Identifies the current idempotent consumer processor.
The
SpringRouteBuilder.bean()
method is a shortcut that references a bean defined in the Spring XML file. The JpaTemplate
bean provides a handle to the underlying JPA database. See the JPA documentation for details of how to configure this bean.
For more details about setting up a JPA repository, see JPA Component documentation, the Spring JPA documentation, and the sample code in the Camel JPA unit test.
Spring XML example 링크 복사링크가 클립보드에 복사되었습니다!
링크 복사링크가 클립보드에 복사되었습니다!
The following example uses the
myMessageId
header to filter out duplicates:
Idempotent consumer with JDBC repository 링크 복사링크가 클립보드에 복사되었습니다!
링크 복사링크가 클립보드에 복사되었습니다!
A JDBC repository is also supported for storing message IDs in the idempotent consumer pattern. The implementation of the JDBC repository is provided by the SQL component, so if you are using the Maven build system, add a dependency on the
camel-sql
artifact.
You can use the
SingleConnectionDataSource
JDBC wrapper class from the Spring persistence API in order to instantiate the connection to a SQL database. For example, to instantiate a JDBC connection to a HyperSQL database instance, you could define the following JDBC data source:
Note
The preceding JDBC data source uses the HyperSQL
mem
protocol, which creates a memory-only database instance. This is a toy implementation of the HyperSQL database which is not actually persistent.
Using the preceding data source, you can define an idempotent consumer pattern that uses the JDBC message ID repository, as follows:
How to handle duplicate messages in the route 링크 복사링크가 클립보드에 복사되었습니다!
링크 복사링크가 클립보드에 복사되었습니다!
Available as of Camel 2.8
You can now set the
skipDuplicate
option to false
which instructs the idempotent consumer to route duplicate messages as well. However the duplicate message has been marked as duplicate by having a property on the Exchange set to true. We can leverage this fact by using a Content-Based Router or Message Filter to detect this and handle duplicate messages.
For example in the following example we use the Message Filter to send the message to a duplicate endpoint, and then stop continue routing that message.
The sample example in XML DSL would be:
How to handle duplicate message in a clustered environment with a data grid 링크 복사링크가 클립보드에 복사되었습니다!
링크 복사링크가 클립보드에 복사되었습니다!
If you have running Camel in a clustered environment, a in memory idempotent repository doesn't work (see above). You can setup either a central database or use the idempotent consumer implementation based on the Hazelcast data grid. Hazelcast finds the nodes over multicast (which is default - configure Hazelcast for tcp-ip) and creates automatically a map based repository:
HazelcastIdempotentRepository idempotentRepo = new HazelcastIdempotentRepository("myrepo"); from("direct:in").idempotentConsumer(header("messageId"), idempotentRepo).to("mock:out");
HazelcastIdempotentRepository idempotentRepo = new HazelcastIdempotentRepository("myrepo");
from("direct:in").idempotentConsumer(header("messageId"), idempotentRepo).to("mock:out");
You have to define how long the repository should hold each message id (default is to delete it never). To avoid that you run out of memory you should create an eviction strategy based on the Hazelcast configuration. For additional information see camel-hazelcast.
See this little tutorial, how setup such an idempotent repository on two cluster nodes using Apache Karaf.
Options 링크 복사링크가 클립보드에 복사되었습니다!
링크 복사링크가 클립보드에 복사되었습니다!
The Idempotent Consumer has the following options:
Option | Default | Description |
---|---|---|
eager
|
true
|
Camel 2.0: Eager controls whether Camel adds the message to the repository before or after the exchange has been processed. If enabled before then Camel will be able to detect duplicate messages even when messages are currently in progress. By disabling Camel will only detect duplicates when a message has successfully been processed. |
messageIdRepositoryRef
|
null
|
A reference to a IdempotentRepository to lookup in the registry. This option is mandatory when using XML DSL.
|
skipDuplicate
|
true
|
Camel 2.8: Sets whether to skip duplicate messages. If set to false then the message will be continued. However the Exchange has been marked as a duplicate by having the Exchange.DUPLICATE_MESSAG exchange property set to a Boolean.TRUE value.
|