11.8. 幂等的消费者


概述

幂等的使用者 模式用于过滤重复的消息。例如,在消息传递系统和消费者端点之间的连接是由于系统中某些错误而丢失的场景。如果消息传递系统处于传输消息的中间位置,则可能不知道使用者是否收到最后一条消息。为提高交付可靠性,在重新建立连接后,消息系统可能会决定立即恢复此类消息。不幸的是,这涉及到消费者可能会收到重复消息的风险,在某些情况下,重复消息的影响可能会带来不良结果(比如从您的帐户中取钱的两倍)。在这种情况下,可以使用幂等的使用者来从消息流中不需要重复的。

Camel 提供以下 Idempotent Consumer 实现:

使用内存缓存的幂等消费者

在 Apache Camel 中,幂等使用者模式由 idempotentConsumer () 处理器实施,后者采用两个参数:

  • messageIdExpression OPTS-sandboxed An expression,用于返回当前消息的消息 ID 字符串。
  • messageIdRepository ALLOW-将 A 引用到一个消息 ID 存储库,该仓库存储了收到的所有消息的 ID。

随着每条消息的出现,幂等的使用者处理器在存储库中查找当前的消息 ID,以查看之前是否看到此消息。如果 yes,则消息被丢弃;如果没有,则允许传递消息并将其 ID 添加到存储库中。

例 11.1 “使用内存中缓存过滤 Duplicate 消息” 中显示的代码使用 TransactionID 标头过滤掉重复的内容。

例 11.1. 使用内存中缓存过滤 Duplicate 消息

import static org.apache.camel.processor.idempotent.MemoryMessageIdRepository.memoryMessageIdRepository;
...
RouteBuilder builder = new RouteBuilder() {
    public void configure() {
        from("seda:a")
          .idempotentConsumer(
             header("TransactionID"),
             memoryMessageIdRepository(200)
          ).to("seda:b");
    }
};

其中,调用 memoryMessageIdRepository (200) 会创建一个内存中缓存,可以容纳 200 个消息 ID。

您还可以使用 XML 配置来定义幂等使用者。例如,您可以在 XML 中定义上述路由,如下所示:

<camelContext id="buildIdempotentConsumer" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="seda:a"/>
    <idempotentConsumer messageIdRepositoryRef="MsgIDRepos">
      <simple>header.TransactionID</simple>
      <to uri="seda:b"/>
    </idempotentConsumer>
  </route>
</camelContext>

<bean id="MsgIDRepos" class="org.apache.camel.processor.idempotent.MemoryMessageIdRepository">
    <!-- Specify the in-memory cache size. -->
    <constructor-arg type="int" value="200"/>
</bean>
注意

从 Camel 2.17 中,Idempotent Repository 支持可选的序列化标头。

使用 JPA 存储库的幂等消费者

内存缓存的发生于易耗尽内存的缺点,且无法在集群环境中工作。为克服这些缺点,您可以使用基于 Java Persistent API (JPA)的存储库。JPA 消息 ID 存储库使用面向对象的数据库来存储消息 ID。例如,您可以定义将 JPA 存储库用于幂等消费者的路由,如下所示:

import org.springframework.orm.jpa.JpaTemplate;

import org.apache.camel.spring.SpringRouteBuilder;
import static org.apache.camel.processor.idempotent.jpa.JpaMessageIdRepository.jpaMessageIdRepository;
...
RouteBuilder builder = new SpringRouteBuilder() {
    public void configure() {
        from("seda:a").idempotentConsumer(
          header("TransactionID"),
          jpaMessageIdRepository(bean(JpaTemplate.class), "myProcessorName")
        ).to("seda:b");
    }
};

JPA 消息 ID 存储库使用两个参数初始化:

  • JpaTemplate 实例关闭 JPA 数据库的句柄。
  • 处理器名称关闭了当前的幂等消费者处理器。

SpringRouteBuilder.bean () 方法是一种快捷方式,引用 Spring XML 文件中定义的 bean。JpaTemplate bean 为底层 JPA 数据库提供句柄。有关如何配置此 bean 的详细信息,请参阅 JPA 文档。

有关设置 JPA 存储库的详情,请参阅 JPA 组件 文档、Spring JPA 文档和 Camel JPA 单元测试 中的示例代码。

Spring XML 示例

以下示例使用 myMessageId 标头过滤掉重复:

<!-- repository for the idempotent consumer -->
<bean id="myRepo" class="org.apache.camel.processor.idempotent.MemoryIdempotentRepository"/>

<camelContext xmlns="http://camel.apache.org/schema/spring">
    <route>
        <from uri="direct:start"/>
        <idempotentConsumer messageIdRepositoryRef="myRepo">
            <!-- use the messageId header as key for identifying duplicate messages -->
            <header>messageId</header>
            <!-- if not a duplicate send it to this mock endpoint -->
            <to uri="mock:result"/>
        </idempotentConsumer>
    </route>
</camelContext>

使用 JDBC 存储库的幂等使用者

JDBC 存储库也支持在幂等使用者模式中存储消息 ID。JDBC 存储库的实施由 SQL 组件提供,因此,如果您使用 Maven 构建系统,则添加对 camel-sql 构件的依赖。

您可以使用 Spring 持久性 API 中的 SingleConnectionDataSource JDBC 打包程序类来实例化与 SQL 数据库的连接。例如,要实例化 JDBC 与 HyperSQL 数据库实例的连接,您可以定义以下 JDBC 数据源:

<bean id="dataSource" class="org.springframework.jdbc.datasource.SingleConnectionDataSource">
    <property name="driverClassName" value="org.hsqldb.jdbcDriver"/>
    <property name="url" value="jdbc:hsqldb:mem:camel_jdbc"/>
    <property name="username" value="sa"/>
    <property name="password" value=""/>
</bean>
注意

前面的 JDBC 数据源使用 HyperSQL mem 协议,它会创建一个仅内存的数据库实例。这是对 HyperSQL 数据库的一种致力实现,它实际上不是永久的。

使用前面的数据源,您可以定义使用 JDBC 消息 ID 存储库的幂等使用者模式,如下所示:

<bean id="messageIdRepository" class="org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository">
	<constructor-arg ref="dataSource" />
	<constructor-arg value="myProcessorName" />
</bean>

<camel:camelContext>
	<camel:errorHandler id="deadLetterChannel" type="DeadLetterChannel" deadLetterUri="mock:error">
		<camel:redeliveryPolicy maximumRedeliveries="0" maximumRedeliveryDelay="0" logStackTrace="false" />
	</camel:errorHandler>

	<camel:route id="JdbcMessageIdRepositoryTest" errorHandlerRef="deadLetterChannel">
		<camel:from uri="direct:start" />
		<camel:idempotentConsumer messageIdRepositoryRef="messageIdRepository">
			<camel:header>messageId</camel:header>
			<camel:to uri="mock:result" />
		</camel:idempotentConsumer>
	</camel:route>
	</camel:camelContext>

如何处理路由中的重复消息

可从 Camel 2.8 开始

现在,您可以将 skipDuplicate 选项设置为 false,它指示幂等使用者也路由重复的消息。但是,重复的信息通过将 “Exchanges”一节 中的属性设置为 true 来标记为重复。我们可以使用 第 8.1 节 “基于内容的路由器”第 8.2 节 “消息过滤器” 来检测并处理重复信息,从而利用这一事实。

例如,在以下示例中,我们使用 第 8.2 节 “消息过滤器” 将消息发送到重复的端点,然后停止继续路由该消息。

from("direct:start")
     // instruct idempotent consumer to not skip duplicates as we will filter then our self
     .idempotentConsumer(header("messageId")).messageIdRepository(repo).skipDuplicate(false)
     .filter(property(Exchange.DUPLICATE_MESSAGE).isEqualTo(true))
         // filter out duplicate messages by sending them to someplace else and then stop
         .to("mock:duplicate")
         .stop()
     .end()
     // and here we process only new messages (no duplicates)
     .to("mock:result");

XML DSL 中的示例示例如下:

 <!-- idempotent repository, just use a memory based for testing -->
 <bean id="myRepo" class="org.apache.camel.processor.idempotent.MemoryIdempotentRepository"/>

 <camelContext xmlns="http://camel.apache.org/schema/spring">
     <route>
         <from uri="direct:start"/>
         <!-- we do not want to skip any duplicate messages -->
         <idempotentConsumer messageIdRepositoryRef="myRepo" skipDuplicate="false">
             <!-- use the messageId header as key for identifying duplicate messages -->
             <header>messageId</header>
             <!-- we will to handle duplicate messages using a filter -->
             <filter>
                 <!-- the filter will only react on duplicate messages, if this property is set on the Exchange -->
                 <property>CamelDuplicateMessage</property>
                 <!-- and send the message to this mock, due its part of an unit test -->
                 <!-- but you can of course do anything as its part of the route -->
                 <to uri="mock:duplicate"/>
                 <!-- and then stop -->
                 <stop/>
             </filter>
             <!-- here we route only new messages -->
             <to uri="mock:result"/>
         </idempotentConsumer>
     </route>
 </camelContext>

如何使用数据网格处理集群环境中的重复消息

如果您在集群环境中运行 Camel,那么在内存幂等存储库中无法正常工作(请参阅上述操作)。您可以设置中央数据库,或使用基于 Hazelcast 数据网格的幂等用户实施。Hazelcast 通过多播查找节点(默认为 - 为 tcp-ip 配置 Hazelcast),并自动创建基于映射的存储库:

HazelcastIdempotentRepository idempotentRepo = new HazelcastIdempotentRepository("myrepo");

from("direct:in").idempotentConsumer(header("messageId"), idempotentRepo).to("mock:out");

您必须定义存储库应保存每条消息 id 的时长(默认为删除它永不)。为避免您耗尽内存,您应该基于 Hazelcast 配置创建 驱除策略。如需更多信息,请参阅 Hazelcast

请参阅此链接:http://camel.apache.org/hazelcast-idempotent-repository-tutorial.html[Idempotent 仓库

教程] 了解有关如何使用 Apache Karaf 在两个群集节点上设置这样的幂等存储库的更多信息。

选项

Idempotent Consumer 具有以下选项:

选项

默认值

描述

eager

true

Camel 2.0: 控制 Camel 在处理交换之前或之后是否向存储库添加消息。如果在之前启用,Camel 也将能够检测重复的信息,即使消息当前正在进行中。禁用 Camel 只会检测在成功处理消息时的重复性。

messageIdRepositoryRef

null

对 registry 中查找 的 IdeotentRepository 的引用。在使用 XML DSL 时,需要这个选项。

skipDuplicate

true

Camel 2.8: 设置是否跳过重复的消息。如果设置为 false,则消息将被续。但是,“Exchanges”一节 已经通过将 Exchange.DUPLICATE_MESSAG Exchange 属性设置为 Boolean.TRUE 值来标记为重复。

completionEager

false

Camel 2.16: 设置在交换完成时是否完成 Idempotent consumereager。

如果您设置了 completeEager 选项 true,则当交换到达幂等消费者块的末尾时,Idempent Consumer 会触发其完成。但是,如果交换在结尾块后继续路由,它不会影响幂等使用者的状态。

如果您设置了 completeEager 选项 false,则 Idempotent Consumer 在交换完成后触发其完成,并被路由。但是,如果交换在块结束后继续路由,它也会影响幂等使用者的状态。例如,由于交换出现故障,因此幂等使用者的状态将是回滚。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.