11.8. idempotent Consumer
概述
幂等的消费者 模式用于过滤重复的消息。例如,假设消息传递系统和消费者端点之间的连接由于系统中的某种故障而完全丢失。如果消息传递系统在传输消息的中间,这可能并不明确,无论消费者是否收到最后一条消息。为提高交付可靠性,在重新建立连接后,消息传递系统可能会决定重新建立此类消息。不幸的是,这需要消费者可能会收到重复消息的风险,在某些情况下,复制信息的影响可能会存在不必要的后果(例如从您的帐户中减去了一倍的费用)。在这种情况下,可以使用幂等消费者从消息流中去除不必要的重复项。
Camel 提供以下 Idempotent Consumer 实现:
-
MemoryIdempotentRepository
- KafkaIdempotentRepository
- File
- Hazelcast
- SQL
- JPA
带有内存缓存的幂等消费者
在 Apache Camel 中,幂等消费者模式由 idempotentConsumer ()
处理器实现,该处理器采用两个参数:
-
messageIdExpression
wagon- swig An 表达式,该表达式返回当前消息的消息 ID 字符串。 -
messageIdRepository
wagon-wagon A 引用消息 ID 存储库,该存储库存储收到的所有消息的 ID。
当每条消息都出现时,幂等消费者处理器会在存储库中查找当前消息 ID,以查看之前是否看到了此消息。如果为 yes,则会丢弃消息;如果没有,则允许消息通过,并将其 ID 添加到存储库中。
例 11.1 “使用内存缓存过滤重复的消息” 中显示的代码使用 TransactionID
标头过滤掉重复项。
例 11.1. 使用内存缓存过滤重复的消息
import static org.apache.camel.processor.idempotent.MemoryMessageIdRepository.memoryMessageIdRepository; ... RouteBuilder builder = new RouteBuilder() { public void configure() { from("seda:a") .idempotentConsumer( header("TransactionID"), memoryMessageIdRepository(200) ).to("seda:b"); } };
如果调用 memoryMessageIdRepository (200)
,它会创建一个可以最多 200 个消息 ID 的内存中缓存。
您还可以使用 XML 配置定义幂等消费者。例如,您可以在 XML 中定义前面的路由,如下所示:
<camelContext id="buildIdempotentConsumer" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="seda:a"/> <idempotentConsumer messageIdRepositoryRef="MsgIDRepos"> <simple>header.TransactionID</simple> <to uri="seda:b"/> </idempotentConsumer> </route> </camelContext> <bean id="MsgIDRepos" class="org.apache.camel.processor.idempotent.MemoryMessageIdRepository"> <!-- Specify the in-memory cache size. --> <constructor-arg type="int" value="200"/> </bean>
从 Camel 2.17 中,Idempotent Repository 支持可选的序列化标头。
使用 JPA repository 的幂等消费者
内存缓存不会造成轻松耗尽内存的缺点,且在集群环境中无法正常工作。要克服这些缺点,您可以使用基于 Java Persistent API (NFD)的存储库。JPA 消息 ID 存储库使用面向对象的数据库来存储消息 ID。例如,您可以定义将 JPA 存储库用于幂等消费者的路由,如下所示:
import org.springframework.orm.jpa.JpaTemplate; import org.apache.camel.spring.SpringRouteBuilder; import static org.apache.camel.processor.idempotent.jpa.JpaMessageIdRepository.jpaMessageIdRepository; ... RouteBuilder builder = new SpringRouteBuilder() { public void configure() { from("seda:a").idempotentConsumer( header("TransactionID"), jpaMessageIdRepository(bean(JpaTemplate.class), "myProcessorName") ).to("seda:b"); } };
JPA 消息 ID 存储库使用两个参数初始化:
-
JpaTemplate
instance swig-wagonProvides 是 JPA 数据库的句柄。 - 处理器名称 swig-rhacmIdent 表示当前的幂等消费者处理器。
SpringRouteBuilder.bean ()
方法是引用 Spring XML 文件中定义的 bean 的快捷方式。JpaTemplate
bean 为底层 JPA 数据库提供句柄。有关如何配置此 bean 的详细信息,请参阅 JPA 文档。
有关设置 JPA 存储库的更多详细信息,请参阅 JPA 组件 文档、Spring JPA 文档和 Camel JPA 单元测试 中的示例代码。
Spring XML 示例
以下示例使用 myMessageId
标头过滤掉重复项:
<!-- repository for the idempotent consumer --> <bean id="myRepo" class="org.apache.camel.processor.idempotent.MemoryIdempotentRepository"/> <camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <idempotentConsumer messageIdRepositoryRef="myRepo"> <!-- use the messageId header as key for identifying duplicate messages --> <header>messageId</header> <!-- if not a duplicate send it to this mock endpoint --> <to uri="mock:result"/> </idempotentConsumer> </route> </camelContext>
使用 JDBC 存储库的幂等消费者
也支持 JDBC 存储库将消息 ID 存储在幂等消费者模式中。JDBC 存储库的实现由 SQL 组件提供,因此如果您使用 Maven 构建系统,请添加对 camel-sql
工件的依赖项。
您可以使用 Spring persistence API 中的 SingleConnectionDataSource
JDBC 打包程序类来实例化到 SQL 数据库的连接。例如,要实例化 JDBC 连接到 HyperSQL 数据库实例,您可以定义以下 JDBC 数据源:
<bean id="dataSource" class="org.springframework.jdbc.datasource.SingleConnectionDataSource"> <property name="driverClassName" value="org.hsqldb.jdbcDriver"/> <property name="url" value="jdbc:hsqldb:mem:camel_jdbc"/> <property name="username" value="sa"/> <property name="password" value=""/> </bean>
前面的 JDBC 数据源使用 HyperSQL mem
协议,它会创建一个仅内存的数据库实例。这是 HyperSQL 数据库的一个迭代实现,它实际上 不是 持久的。
使用前面的数据源,您可以定义使用 JDBC 消息 ID 存储库的幂等消费者模式,如下所示:
<bean id="messageIdRepository" class="org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository"> <constructor-arg ref="dataSource" /> <constructor-arg value="myProcessorName" /> </bean> <camel:camelContext> <camel:errorHandler id="deadLetterChannel" type="DeadLetterChannel" deadLetterUri="mock:error"> <camel:redeliveryPolicy maximumRedeliveries="0" maximumRedeliveryDelay="0" logStackTrace="false" /> </camel:errorHandler> <camel:route id="JdbcMessageIdRepositoryTest" errorHandlerRef="deadLetterChannel"> <camel:from uri="direct:start" /> <camel:idempotentConsumer messageIdRepositoryRef="messageIdRepository"> <camel:header>messageId</camel:header> <camel:to uri="mock:result" /> </camel:idempotentConsumer> </camel:route> </camel:camelContext>
如何在路由中处理重复的消息
从 Camel 2.8 开始提供
现在,您可以将 skipDuplicate
选项设置为 false
,它指示幂等消费者也路由重复消息。但是,重复的消息已通过将 “Exchanges”一节 中的属性设置为 true 来标记为重复。我们可以使用 第 8.1 节 “基于内容的路由器” 或 第 8.2 节 “Message Filter” 来检测这个事实并处理重复的信息。
例如,在以下示例中,我们使用 第 8.2 节 “Message Filter” 将消息发送到重复的端点,然后停止该消息。
from("direct:start") // instruct idempotent consumer to not skip duplicates as we will filter then our self .idempotentConsumer(header("messageId")).messageIdRepository(repo).skipDuplicate(false) .filter(property(Exchange.DUPLICATE_MESSAGE).isEqualTo(true)) // filter out duplicate messages by sending them to someplace else and then stop .to("mock:duplicate") .stop() .end() // and here we process only new messages (no duplicates) .to("mock:result");
XML DSL 中的示例为:
<!-- idempotent repository, just use a memory based for testing --> <bean id="myRepo" class="org.apache.camel.processor.idempotent.MemoryIdempotentRepository"/> <camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <!-- we do not want to skip any duplicate messages --> <idempotentConsumer messageIdRepositoryRef="myRepo" skipDuplicate="false"> <!-- use the messageId header as key for identifying duplicate messages --> <header>messageId</header> <!-- we will to handle duplicate messages using a filter --> <filter> <!-- the filter will only react on duplicate messages, if this property is set on the Exchange --> <property>CamelDuplicateMessage</property> <!-- and send the message to this mock, due its part of an unit test --> <!-- but you can of course do anything as its part of the route --> <to uri="mock:duplicate"/> <!-- and then stop --> <stop/> </filter> <!-- here we route only new messages --> <to uri="mock:result"/> </idempotentConsumer> </route> </camelContext>
如何使用数据网格处理集群环境中的重复消息
如果您在集群环境中运行 Camel,则内存幂等存储库中的 不起作用(请参阅上述内容)。您可以设置中央数据库,或使用基于 Hazelcast 数据网格的幂等消费者实施。Hazelcast 发现节点多播(默认为 tcp-ip)配置 Hazelcast,并创建一个基于映射的存储库:
HazelcastIdempotentRepository idempotentRepo = new HazelcastIdempotentRepository("myrepo"); from("direct:in").idempotentConsumer(header("messageId"), idempotentRepo).to("mock:out");
您必须定义存储库应保留每个消息 ID 的时长(默认是删除它)。为避免内存不足,您应该根据 Hazelcast 配置创建 驱除策略。如需更多信息,请参阅 Hazelcast。
请参阅此链接:http://camel.apache.org/hazelcast-idempotent-repository-tutorial.html[Idempotent 仓库
教程]了解如何使用 Apache Karaf 在两个集群节点上设置此类幂等存储库。
选项
Idempotent Consumer 带有以下选项:
选项 | 默认 | 描述 |
|
| Camel 2.0: Eager 控制 Camel 是否在处理交换前或之后将消息添加到存储库中。如果在随后启用之前启用,则 Camel 能够检测到重复的消息,即使消息当前正在进行中。禁用 Camel 只有在成功处理消息时才会检测到重复。 |
|
|
对注册表中查找的 |
|
|
Camel 2.8: 设置是否跳过重复消息。如果设置为 |
|
| Camel 2.16 : 当交换完成后,设置是否要完成 Idempotent consumer eager。
如果您设置了
如果您设置了 |