11.8. 幂等的消费者


概述

幂等的使用者 模式用于过滤重复的消息。例如,在消息传递系统和消费者端点之间进行连接的情况会因为系统中的一些故障而丢失。如果消息传递系统位于传输消息的中间,这可能并不明确,消费者是否收到了最后一条消息。为提高交付可靠性,消息传递系统可能决定在重新建立连接后尽快进行此类消息。不幸的是,这要求消费者可能会收到重复消息的风险,在某些情况下,重复消息的结果可能会无法产生不可取的后果(例如,从您的帐户中取消了两次的资金总和两次)。在这种情况下,可以使用幂等的消费者来从消息流中意外重复。

Camel 提供以下 Idempotent Consumer 实现:

带有内存缓存的幂等消费者

在 Apache Camel 中,幂等的使用者模式由 idempotentConsumer() 处理器实施,它采用两个参数:

  • messageIdExpression abrt-abrt An 表达式为当前消息返回消息 ID 字符串。
  • messageIdRepository >_< A reference to a message ID repository,它存储收到的所有消息的 ID。

当每条消息出现时,幂等使用者处理器会在存储库中查找当前消息 ID,以查看此消息之前是否已看到。如果 yes,则会丢弃消息;如果没有,则消息被允许通过,并将其 ID 添加到存储库中。

例 11.1 “使用内存缓存过滤 Duplicate 消息” 中显示的代码使用 TransactionID 标头来过滤掉重复。

例 11.1. 使用内存缓存过滤 Duplicate 消息

import static org.apache.camel.processor.idempotent.MemoryMessageIdRepository.memoryMessageIdRepository;
...
RouteBuilder builder = new RouteBuilder() {
    public void configure() {
        from("seda:a")
          .idempotentConsumer(
             header("TransactionID"),
             memoryMessageIdRepository(200)
          ).to("seda:b");
    }
};

其中调用 memoryMessageIdRepository(200) 会创建一个内存缓存,它最多可容纳 200 消息 ID。

您还可以使用 XML 配置来定义幂等消费者。例如,您可以在 XML 中定义前面的路由,如下所示:

<camelContext id="buildIdempotentConsumer" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="seda:a"/>
    <idempotentConsumer messageIdRepositoryRef="MsgIDRepos">
      <simple>header.TransactionID</simple>
      <to uri="seda:b"/>
    </idempotentConsumer>
  </route>
</camelContext>

<bean id="MsgIDRepos" class="org.apache.camel.processor.idempotent.MemoryMessageIdRepository">
    <!-- Specify the in-memory cache size. -->
    <constructor-arg type="int" value="200"/>
</bean>
注意

从 Camel 2.17 中,Idempotent Repository 支持可选的序列化标头。

使用 JPA 存储库的幂等消费者

内存缓存的缺陷影响内存不足,且不能在集群环境中工作。要克服这些缺点,您可以使用基于 Java Persistent API(JPA)的存储库。JPA 消息 ID 存储库使用面向对象的数据库来存储消息 ID。例如,您可以定义将 JPA 存储库用于幂等消费者的路由,如下所示:

import org.springframework.orm.jpa.JpaTemplate;

import org.apache.camel.spring.SpringRouteBuilder;
import static org.apache.camel.processor.idempotent.jpa.JpaMessageIdRepository.jpaMessageIdRepository;
...
RouteBuilder builder = new SpringRouteBuilder() {
    public void configure() {
        from("seda:a").idempotentConsumer(
          header("TransactionID"),
          jpaMessageIdRepository(bean(JpaTemplate.class), "myProcessorName")
        ).to("seda:b");
    }
};

JPA 消息 ID 存储库使用两个参数初始化:

  • JpaTemplate instance>_<-jaxbProvides the JPA 数据库的句柄。
  • 处理器 name 3.10.0--jaxbIdent 表示当前的幂等消费者处理器。

SpringRouteBuilder.bean() 方法是引用 Spring XML 文件中定义的 bean 的快捷方式。JpaTemplate bean 为底层 JPA 数据库提供了句柄。有关如何配置此 bean 的详情,请查看 JPA 文档。

有关设置 JPA 存储库的详情,请参阅 JPA 组件 文档、Spring JPA 文档和 Camel JPA 单元测试 中的示例代码。

Spring XML 示例

以下示例使用 myMessageId 标头过滤掉重复:

<!-- repository for the idempotent consumer -->
<bean id="myRepo" class="org.apache.camel.processor.idempotent.MemoryIdempotentRepository"/>

<camelContext xmlns="http://camel.apache.org/schema/spring">
    <route>
        <from uri="direct:start"/>
        <idempotentConsumer messageIdRepositoryRef="myRepo">
            <!-- use the messageId header as key for identifying duplicate messages -->
            <header>messageId</header>
            <!-- if not a duplicate send it to this mock endpoint -->
            <to uri="mock:result"/>
        </idempotentConsumer>
    </route>
</camelContext>

具有 JDBC 存储库的幂等消费者

JDBC 存储库也支持将消息 ID 存储在幂等消费者模式中。JDBC 存储库的实现由 SQL 组件提供。因此,如果您使用 Maven 构建系统,请添加对 camel-sql 工件的依赖项。

您可以使用 Spring persistence API 中的 SingleConnectionDataSource wrapper 类来实例化与 SQL 数据库的连接。例如,若要将 JDBC 连接实例化到 HyperSQL 数据库实例,您可以定义以下 JDBC 数据源:

<bean id="dataSource" class="org.springframework.jdbc.datasource.SingleConnectionDataSource">
    <property name="driverClassName" value="org.hsqldb.jdbcDriver"/>
    <property name="url" value="jdbc:hsqldb:mem:camel_jdbc"/>
    <property name="username" value="sa"/>
    <property name="password" value=""/>
</bean>
注意

前面的 JDBC 数据源使用 HyperSQL mem 协议,该协议会创建一个仅内存的数据库实例。这是 尚未 持久的 HyperSQL 数据库的实施。

使用前面的数据源,您可以定义一个使用 JDBC 消息 ID 存储库的幂等消费者模式,如下所示:

<bean id="messageIdRepository" class="org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository">
	<constructor-arg ref="dataSource" />
	<constructor-arg value="myProcessorName" />
</bean>

<camel:camelContext>
	<camel:errorHandler id="deadLetterChannel" type="DeadLetterChannel" deadLetterUri="mock:error">
		<camel:redeliveryPolicy maximumRedeliveries="0" maximumRedeliveryDelay="0" logStackTrace="false" />
	</camel:errorHandler>

	<camel:route id="JdbcMessageIdRepositoryTest" errorHandlerRef="deadLetterChannel">
		<camel:from uri="direct:start" />
		<camel:idempotentConsumer messageIdRepositoryRef="messageIdRepository">
			<camel:header>messageId</camel:header>
			<camel:to uri="mock:result" />
		</camel:idempotentConsumer>
	</camel:route>
	</camel:camelContext>

如何在路由中处理重复的信息

可作为 Camel 2.8 提供。

现在,您可以将 skipDuplicate 选项设置为 false,它会指示幂等的使用者也用于路由重复的消息。但是,重复的消息已通过将 “交换”一节 中的属性设为 true 来被标记为重复。我们可以使用 第 8.1 节 “基于内容的路由器”第 8.2 节 “Message Filter” 检测此消息并处理重复的信息来利用这个事实。

例如,在以下示例中,我们使用 第 8.2 节 “Message Filter” 发送消息到重复的端点,然后停止继续路由该消息。

from("direct:start")
     // instruct idempotent consumer to not skip duplicates as we will filter then our self
     .idempotentConsumer(header("messageId")).messageIdRepository(repo).skipDuplicate(false)
     .filter(property(Exchange.DUPLICATE_MESSAGE).isEqualTo(true))
         // filter out duplicate messages by sending them to someplace else and then stop
         .to("mock:duplicate")
         .stop()
     .end()
     // and here we process only new messages (no duplicates)
     .to("mock:result");

XML DSL 中的示例如下:

 <!-- idempotent repository, just use a memory based for testing -->
 <bean id="myRepo" class="org.apache.camel.processor.idempotent.MemoryIdempotentRepository"/>

 <camelContext xmlns="http://camel.apache.org/schema/spring">
     <route>
         <from uri="direct:start"/>
         <!-- we do not want to skip any duplicate messages -->
         <idempotentConsumer messageIdRepositoryRef="myRepo" skipDuplicate="false">
             <!-- use the messageId header as key for identifying duplicate messages -->
             <header>messageId</header>
             <!-- we will to handle duplicate messages using a filter -->
             <filter>
                 <!-- the filter will only react on duplicate messages, if this property is set on the Exchange -->
                 <property>CamelDuplicateMessage</property>
                 <!-- and send the message to this mock, due its part of an unit test -->
                 <!-- but you can of course do anything as its part of the route -->
                 <to uri="mock:duplicate"/>
                 <!-- and then stop -->
                 <stop/>
             </filter>
             <!-- here we route only new messages -->
             <to uri="mock:result"/>
         </idempotentConsumer>
     </route>
 </camelContext>

如何使用数据网格处理集群环境中的重复消息

如果您在集群环境中运行 Camel,则需要一个内存幂等性存储库无法工作(请参阅上面的操作)。您可以根据 Hazelcast 数据网格设置中央数据库或使用幂等的消费者实施。Hazelcast 通过多播查找节点(默认 - 为 tcp-ip 配置 Hazelcast),并创建基于映射的存储库:

HazelcastIdempotentRepository idempotentRepo = new HazelcastIdempotentRepository("myrepo");

from("direct:in").idempotentConsumer(header("messageId"), idempotentRepo).to("mock:out");

您必须定义存储库应当容纳每个消息 id 的时长(默认为从不删除)。为了避免内存不足,您应该基于 Hazelcast 配置创建 驱除策略。如需了解更多信息,请参阅 Hazelcast

请参阅此链接:http://camel.apache.org/hazelcast-idempotent-repository-tutorial.html[Idempotent 软件仓库

教程] 了解更多有关如何使用 Apache Karaf 在两个集群节点上设置此类幂等存储库。

选项

Idempotent Consumer 有以下选项:

选项

默认值

描述

eager

true

Camel 2.0: Eager 控制 Camel 是否在处理交换之前或之后将消息添加到存储库中。如果在之前启用,Camel 将能够检测到重复的消息,即使消息当前正在进行时也是如此。通过禁用 Camel,只有在成功处理消息时才会检测到重复的。

messageIdRepositoryRef

null

IdempotentRepository 的引用,以便在 registry 中查询。使用 XML DSL 时,这个选项是必须的。

skipDuplicate

true

Camel 2.8: 设置是否跳过重复消息。如果设为 false,则消息将继续。但是,“交换”一节 已通过将 Exchange.DUPLICATE_MESSAG Exchange 属性设置为 Boolean.TRUE 值来被标记为一个重复的。

completionEager

false

Camel 2.16: 完成交换时是否需要完成 Idempotent eager。

如果您设置了 completeEager 选项 true,那么当交换到达幂等消费者块的末尾时,Idempotent Consumer 会触发其完成。但是,如果交换在最终块后仍然可以被路由,那么它不会影响幂等消费者的状态。

如果您设置了 completeEager 选项 false,则 Idempotent Consumer 会在交换完成后触发其完成,并被路由。但是,如果交换在块结束后仍继续路由,那么它也会影响幂等消费者的状态。例如,由于交换失败导致异常,因此幂等消费者的状态将是回滚。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.