11.8. idempotent Consumer


概述

幂等的消费者 模式用于过滤重复的消息。例如,假设消息传递系统和消费者端点之间的连接由于系统中的某种故障而完全丢失。如果消息传递系统在传输消息的中间,这可能并不明确,无论消费者是否收到最后一条消息。为提高交付可靠性,在重新建立连接后,消息传递系统可能会决定重新建立此类消息。不幸的是,这需要消费者可能会收到重复消息的风险,在某些情况下,复制信息的影响可能会存在不必要的后果(例如从您的帐户中减去了一倍的费用)。在这种情况下,可以使用幂等消费者从消息流中去除不必要的重复项。

Camel 提供以下 Idempotent Consumer 实现:

带有内存缓存的幂等消费者

在 Apache Camel 中,幂等消费者模式由 idempotentConsumer () 处理器实现,该处理器采用两个参数:

  • messageIdExpression wagon- swig An 表达式,该表达式返回当前消息的消息 ID 字符串。
  • messageIdRepository wagon-wagon A 引用消息 ID 存储库,该存储库存储收到的所有消息的 ID。

当每条消息都出现时,幂等消费者处理器会在存储库中查找当前消息 ID,以查看之前是否看到了此消息。如果为 yes,则会丢弃消息;如果没有,则允许消息通过,并将其 ID 添加到存储库中。

例 11.1 “使用内存缓存过滤重复的消息” 中显示的代码使用 TransactionID 标头过滤掉重复项。

例 11.1. 使用内存缓存过滤重复的消息

import static org.apache.camel.processor.idempotent.MemoryMessageIdRepository.memoryMessageIdRepository;
...
RouteBuilder builder = new RouteBuilder() {
    public void configure() {
        from("seda:a")
          .idempotentConsumer(
             header("TransactionID"),
             memoryMessageIdRepository(200)
          ).to("seda:b");
    }
};

如果调用 memoryMessageIdRepository (200),它会创建一个可以最多 200 个消息 ID 的内存中缓存。

您还可以使用 XML 配置定义幂等消费者。例如,您可以在 XML 中定义前面的路由,如下所示:

<camelContext id="buildIdempotentConsumer" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="seda:a"/>
    <idempotentConsumer messageIdRepositoryRef="MsgIDRepos">
      <simple>header.TransactionID</simple>
      <to uri="seda:b"/>
    </idempotentConsumer>
  </route>
</camelContext>

<bean id="MsgIDRepos" class="org.apache.camel.processor.idempotent.MemoryMessageIdRepository">
    <!-- Specify the in-memory cache size. -->
    <constructor-arg type="int" value="200"/>
</bean>
注意

从 Camel 2.17 中,Idempotent Repository 支持可选的序列化标头。

使用 JPA repository 的幂等消费者

内存缓存不会造成轻松耗尽内存的缺点,且在集群环境中无法正常工作。要克服这些缺点,您可以使用基于 Java Persistent API (NFD)的存储库。JPA 消息 ID 存储库使用面向对象的数据库来存储消息 ID。例如,您可以定义将 JPA 存储库用于幂等消费者的路由,如下所示:

import org.springframework.orm.jpa.JpaTemplate;

import org.apache.camel.spring.SpringRouteBuilder;
import static org.apache.camel.processor.idempotent.jpa.JpaMessageIdRepository.jpaMessageIdRepository;
...
RouteBuilder builder = new SpringRouteBuilder() {
    public void configure() {
        from("seda:a").idempotentConsumer(
          header("TransactionID"),
          jpaMessageIdRepository(bean(JpaTemplate.class), "myProcessorName")
        ).to("seda:b");
    }
};

JPA 消息 ID 存储库使用两个参数初始化:

  • JpaTemplate instance swig-wagonProvides 是 JPA 数据库的句柄。
  • 处理器名称 swig-rhacmIdent 表示当前的幂等消费者处理器。

SpringRouteBuilder.bean () 方法是引用 Spring XML 文件中定义的 bean 的快捷方式。JpaTemplate bean 为底层 JPA 数据库提供句柄。有关如何配置此 bean 的详细信息,请参阅 JPA 文档。

有关设置 JPA 存储库的更多详细信息,请参阅 JPA 组件 文档、Spring JPA 文档和 Camel JPA 单元测试 中的示例代码。

Spring XML 示例

以下示例使用 myMessageId 标头过滤掉重复项:

<!-- repository for the idempotent consumer -->
<bean id="myRepo" class="org.apache.camel.processor.idempotent.MemoryIdempotentRepository"/>

<camelContext xmlns="http://camel.apache.org/schema/spring">
    <route>
        <from uri="direct:start"/>
        <idempotentConsumer messageIdRepositoryRef="myRepo">
            <!-- use the messageId header as key for identifying duplicate messages -->
            <header>messageId</header>
            <!-- if not a duplicate send it to this mock endpoint -->
            <to uri="mock:result"/>
        </idempotentConsumer>
    </route>
</camelContext>

使用 JDBC 存储库的幂等消费者

也支持 JDBC 存储库将消息 ID 存储在幂等消费者模式中。JDBC 存储库的实现由 SQL 组件提供,因此如果您使用 Maven 构建系统,请添加对 camel-sql 工件的依赖项。

您可以使用 Spring persistence API 中的 SingleConnectionDataSource JDBC 打包程序类来实例化到 SQL 数据库的连接。例如,要实例化 JDBC 连接到 HyperSQL 数据库实例,您可以定义以下 JDBC 数据源:

<bean id="dataSource" class="org.springframework.jdbc.datasource.SingleConnectionDataSource">
    <property name="driverClassName" value="org.hsqldb.jdbcDriver"/>
    <property name="url" value="jdbc:hsqldb:mem:camel_jdbc"/>
    <property name="username" value="sa"/>
    <property name="password" value=""/>
</bean>
注意

前面的 JDBC 数据源使用 HyperSQL mem 协议,它会创建一个仅内存的数据库实例。这是 HyperSQL 数据库的一个迭代实现,它实际上 不是 持久的。

使用前面的数据源,您可以定义使用 JDBC 消息 ID 存储库的幂等消费者模式,如下所示:

<bean id="messageIdRepository" class="org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository">
	<constructor-arg ref="dataSource" />
	<constructor-arg value="myProcessorName" />
</bean>

<camel:camelContext>
	<camel:errorHandler id="deadLetterChannel" type="DeadLetterChannel" deadLetterUri="mock:error">
		<camel:redeliveryPolicy maximumRedeliveries="0" maximumRedeliveryDelay="0" logStackTrace="false" />
	</camel:errorHandler>

	<camel:route id="JdbcMessageIdRepositoryTest" errorHandlerRef="deadLetterChannel">
		<camel:from uri="direct:start" />
		<camel:idempotentConsumer messageIdRepositoryRef="messageIdRepository">
			<camel:header>messageId</camel:header>
			<camel:to uri="mock:result" />
		</camel:idempotentConsumer>
	</camel:route>
	</camel:camelContext>

如何在路由中处理重复的消息

从 Camel 2.8 开始提供

现在,您可以将 skipDuplicate 选项设置为 false,它指示幂等消费者也路由重复消息。但是,重复的消息已通过将 “Exchanges”一节 中的属性设置为 true 来标记为重复。我们可以使用 第 8.1 节 “基于内容的路由器”第 8.2 节 “Message Filter” 来检测这个事实并处理重复的信息。

例如,在以下示例中,我们使用 第 8.2 节 “Message Filter” 将消息发送到重复的端点,然后停止该消息。

from("direct:start")
     // instruct idempotent consumer to not skip duplicates as we will filter then our self
     .idempotentConsumer(header("messageId")).messageIdRepository(repo).skipDuplicate(false)
     .filter(property(Exchange.DUPLICATE_MESSAGE).isEqualTo(true))
         // filter out duplicate messages by sending them to someplace else and then stop
         .to("mock:duplicate")
         .stop()
     .end()
     // and here we process only new messages (no duplicates)
     .to("mock:result");

XML DSL 中的示例为:

 <!-- idempotent repository, just use a memory based for testing -->
 <bean id="myRepo" class="org.apache.camel.processor.idempotent.MemoryIdempotentRepository"/>

 <camelContext xmlns="http://camel.apache.org/schema/spring">
     <route>
         <from uri="direct:start"/>
         <!-- we do not want to skip any duplicate messages -->
         <idempotentConsumer messageIdRepositoryRef="myRepo" skipDuplicate="false">
             <!-- use the messageId header as key for identifying duplicate messages -->
             <header>messageId</header>
             <!-- we will to handle duplicate messages using a filter -->
             <filter>
                 <!-- the filter will only react on duplicate messages, if this property is set on the Exchange -->
                 <property>CamelDuplicateMessage</property>
                 <!-- and send the message to this mock, due its part of an unit test -->
                 <!-- but you can of course do anything as its part of the route -->
                 <to uri="mock:duplicate"/>
                 <!-- and then stop -->
                 <stop/>
             </filter>
             <!-- here we route only new messages -->
             <to uri="mock:result"/>
         </idempotentConsumer>
     </route>
 </camelContext>

如何使用数据网格处理集群环境中的重复消息

如果您在集群环境中运行 Camel,则内存幂等存储库中的 不起作用(请参阅上述内容)。您可以设置中央数据库,或使用基于 Hazelcast 数据网格的幂等消费者实施。Hazelcast 发现节点多播(默认为 tcp-ip)配置 Hazelcast,并创建一个基于映射的存储库:

HazelcastIdempotentRepository idempotentRepo = new HazelcastIdempotentRepository("myrepo");

from("direct:in").idempotentConsumer(header("messageId"), idempotentRepo).to("mock:out");

您必须定义存储库应保留每个消息 ID 的时长(默认是删除它)。为避免内存不足,您应该根据 Hazelcast 配置创建 驱除策略。如需更多信息,请参阅 Hazelcast

请参阅此链接:http://camel.apache.org/hazelcast-idempotent-repository-tutorial.html[Idempotent 仓库

教程]了解如何使用 Apache Karaf 在两个集群节点上设置此类幂等存储库。

选项

Idempotent Consumer 带有以下选项:

选项

默认

描述

eager

true

Camel 2.0: Eager 控制 Camel 是否在处理交换前或之后将消息添加到存储库中。如果在随后启用之前启用,则 Camel 能够检测到重复的消息,即使消息当前正在进行中。禁用 Camel 只有在成功处理消息时才会检测到重复。

messageIdRepositoryRef

null

对注册表中查找的 IdempotentRepository 的引用。使用 XML DSL 时这个选项是必须的。

skipDuplicate

true

Camel 2.8: 设置是否跳过重复消息。如果设置为 false,则消息将继续。但是,“Exchanges”一节 已通过将 Exchange.DUPLICATE_MESSAG Exchange 属性设置为 布尔值

completionEager

false

Camel 2.16 : 当交换完成后,设置是否要完成 Idempotent consumer eager。

如果您设置了 completeEager 选项 true,则 Idempotent Consumer 会在交换到达幂等消费者模式块的末尾时触发其完成。但是,如果交换在结束块后继续路由,则它不会影响幂等消费者的状态。

如果您设置了 completeEager 选项 false,则 Idempotent Consumer 会在交换完成后触发其完成,并被路由。但是,如果交换在块结束后仍然继续路由,那么它也会影响幂等消费者的状态。例如,由于交换失败,因此幂等消费者的状态将是回滚。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.