11.8. idempotent Consumer


概述

幂等的消费者 模式用于过滤重复的消息。例如,假设一个方案:消息传递系统和消费者端点之间的连接会因为系统出现某种故障而丢失。如果消息传递系统在传输消息的中间处,这可能不明确,消费者是否收到最后一个消息。为提高交付可靠性,消息传递系统可能决定在重新建立连接后尽快采用此类消息。不幸的是,这涉及使用者可能会收到重复消息的风险,在某些情况下,复制消息的影响可能会带来不良的后果(比如从您的帐户中解放两下成本)。在这种情况下,可以使用幂等的消费者来判断消息流中的不必要的重复项。

Camel 提供以下 Idempotent Consumer 实现:

具有内存缓存的幂等消费者

在 Apache Camel 中,幂等 消费者模式由 idempotentConsumer () 处理器实现,它采用两个参数:

  • messageIdExpression criu-criu An 表达式,用于返回当前消息的消息 ID 字符串。
  • messageIdRepository mvapich-wagon A 引用消息 ID 存储库,该存储库存储收到的所有消息的 ID。

随着每条消息进入,幂等的消费者处理器会在存储库中查找当前的消息 ID,以查看此消息是否已看到。如果为 yes,则会丢弃该消息;如果没有,则允许消息传递,并将其 ID 添加到存储库中。

例 11.1 “使用内存中缓存过滤重复消息” 中显示的代码使用 TransactionID 标头来过滤重复项。

例 11.1. 使用内存中缓存过滤重复消息

import static org.apache.camel.processor.idempotent.MemoryMessageIdRepository.memoryMessageIdRepository;
...
RouteBuilder builder = new RouteBuilder() {
    public void configure() {
        from("seda:a")
          .idempotentConsumer(
             header("TransactionID"),
             memoryMessageIdRepository(200)
          ).to("seda:b");
    }
};

其中,调用 memoryMessageIdRepository (200) 创建一个内存缓存,最多可保存 200 个消息 ID。

您还可以使用 XML 配置定义幂等的消费者。例如,您可以在 XML 中定义前面的路由,如下所示:

<camelContext id="buildIdempotentConsumer" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="seda:a"/>
    <idempotentConsumer messageIdRepositoryRef="MsgIDRepos">
      <simple>header.TransactionID</simple>
      <to uri="seda:b"/>
    </idempotentConsumer>
  </route>
</camelContext>

<bean id="MsgIDRepos" class="org.apache.camel.processor.idempotent.MemoryMessageIdRepository">
    <!-- Specify the in-memory cache size. -->
    <constructor-arg type="int" value="200"/>
</bean>
注意

从 Camel 2.17,Idempotent Repository 支持可选的序列化标头。

使用 JPA 软件仓库的幂等消费者

内存中缓存会影响到易用内存不足且不在集群环境中工作的缺点。为克服这些缺点,您可以使用基于 Java 持久 API (ROX)的存储库。JPA 消息 ID 存储库使用面向对象的数据库来存储消息 ID。例如,您可以定义将 JPA 存储库用于幂等消费者的路由,如下所示:

import org.springframework.orm.jpa.JpaTemplate;

import org.apache.camel.spring.SpringRouteBuilder;
import static org.apache.camel.processor.idempotent.jpa.JpaMessageIdRepository.jpaMessageIdRepository;
...
RouteBuilder builder = new SpringRouteBuilder() {
    public void configure() {
        from("seda:a").idempotentConsumer(
          header("TransactionID"),
          jpaMessageIdRepository(bean(JpaTemplate.class), "myProcessorName")
        ).to("seda:b");
    }
};

JPA 消息 ID 存储库通过两个参数进行初始化:

  • JpaTemplate 实例 iwl-busyboxProvides 可对 JPA 数据库进行处理。
  • 处理器名称 iwl-wagonId 会识别当前的幂等消费者处理器。

SpringRouteBuilder.bean () 方法是一种引用 Spring XML 文件中定义的 bean 的快捷方式。JpaTemplate bean 为底层 JPA 数据库提供了一个句柄。有关如何配置此 bean 的详情,请查看 JPA 文档。

有关设置 JPA 存储库的详情,请参阅 Camel JPA 单元测试中的 JPA 组件 文档、Spring JPA 文档和示例代码。

Spring XML 示例

以下示例使用 myMessageId 标头过滤出重复项:

<!-- repository for the idempotent consumer -->
<bean id="myRepo" class="org.apache.camel.processor.idempotent.MemoryIdempotentRepository"/>

<camelContext xmlns="http://camel.apache.org/schema/spring">
    <route>
        <from uri="direct:start"/>
        <idempotentConsumer messageIdRepositoryRef="myRepo">
            <!-- use the messageId header as key for identifying duplicate messages -->
            <header>messageId</header>
            <!-- if not a duplicate send it to this mock endpoint -->
            <to uri="mock:result"/>
        </idempotentConsumer>
    </route>
</camelContext>

具有 JDBC 存储库的幂等消费者

JDBC 存储库也支持将消息 ID 存储在幂等消费者模式中。JDBC 存储库的实施由 SQL 组件提供,因此如果您使用的是 Maven 构建系统,则对 camel-sql 工件添加依赖项。

您可以使用 Spring persistence API 中的 SingleConnectionDataSource JDBC 打包程序类来实例化与 SQL 数据库的连接。例如,要实例化 JDBC 连接到 HyperSQL 数据库实例,您可以定义以下 JDBC 数据源:

<bean id="dataSource" class="org.springframework.jdbc.datasource.SingleConnectionDataSource">
    <property name="driverClassName" value="org.hsqldb.jdbcDriver"/>
    <property name="url" value="jdbc:hsqldb:mem:camel_jdbc"/>
    <property name="username" value="sa"/>
    <property name="password" value=""/>
</bean>
注意

前面的 JDBC 数据源使用 HyperSQL mem 协议,它会创建一个仅限内存的数据库实例。这是 HyperSQL 数据库的 ay 实现,它 实际上并没有 持久的。

使用前面的数据源,您可以定义一个使用 JDBC 消息 ID 存储库的幂等消费者模式,如下所示:

<bean id="messageIdRepository" class="org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository">
	<constructor-arg ref="dataSource" />
	<constructor-arg value="myProcessorName" />
</bean>

<camel:camelContext>
	<camel:errorHandler id="deadLetterChannel" type="DeadLetterChannel" deadLetterUri="mock:error">
		<camel:redeliveryPolicy maximumRedeliveries="0" maximumRedeliveryDelay="0" logStackTrace="false" />
	</camel:errorHandler>

	<camel:route id="JdbcMessageIdRepositoryTest" errorHandlerRef="deadLetterChannel">
		<camel:from uri="direct:start" />
		<camel:idempotentConsumer messageIdRepositoryRef="messageIdRepository">
			<camel:header>messageId</camel:header>
			<camel:to uri="mock:result" />
		</camel:idempotentConsumer>
	</camel:route>
	</camel:camelContext>

如何处理路由中的重复消息

从 Camel 2.8 开始提供

现在,您可以将 skipDuplicate 选项设置为 false,它指示幂等的消费者也路由重复的信息。但是,通过将 “Exchanges”一节 设置为 true 的属性将重复消息被标记为重复。我们可以使用 第 8.1 节 “基于内容的路由器”第 8.2 节 “消息过滤器” 来检测这个问题并处理重复的信息。

例如,在以下示例中,使用 第 8.2 节 “消息过滤器” 将消息发送到重复的端点,然后停止继续路由该消息。

from("direct:start")
     // instruct idempotent consumer to not skip duplicates as we will filter then our self
     .idempotentConsumer(header("messageId")).messageIdRepository(repo).skipDuplicate(false)
     .filter(property(Exchange.DUPLICATE_MESSAGE).isEqualTo(true))
         // filter out duplicate messages by sending them to someplace else and then stop
         .to("mock:duplicate")
         .stop()
     .end()
     // and here we process only new messages (no duplicates)
     .to("mock:result");

XML DSL 中的示例示例为:

 <!-- idempotent repository, just use a memory based for testing -->
 <bean id="myRepo" class="org.apache.camel.processor.idempotent.MemoryIdempotentRepository"/>

 <camelContext xmlns="http://camel.apache.org/schema/spring">
     <route>
         <from uri="direct:start"/>
         <!-- we do not want to skip any duplicate messages -->
         <idempotentConsumer messageIdRepositoryRef="myRepo" skipDuplicate="false">
             <!-- use the messageId header as key for identifying duplicate messages -->
             <header>messageId</header>
             <!-- we will to handle duplicate messages using a filter -->
             <filter>
                 <!-- the filter will only react on duplicate messages, if this property is set on the Exchange -->
                 <property>CamelDuplicateMessage</property>
                 <!-- and send the message to this mock, due its part of an unit test -->
                 <!-- but you can of course do anything as its part of the route -->
                 <to uri="mock:duplicate"/>
                 <!-- and then stop -->
                 <stop/>
             </filter>
             <!-- here we route only new messages -->
             <to uri="mock:result"/>
         </idempotentConsumer>
     </route>
 </camelContext>

如何使用数据网格在集群环境中处理重复消息

如果您在集群环境中运行 Camel,则内存幂等存储库中无法正常工作(请参阅上述内容)。您可以设置中央数据库,或使用基于 Hazelcast 网格的幂等消费者实施。Hazelcast 查找节点通过多播(默认为 tcp-ip 配置 Hazelcast),并自动创建基于映射的存储库:

HazelcastIdempotentRepository idempotentRepo = new HazelcastIdempotentRepository("myrepo");

from("direct:in").idempotentConsumer(header("messageId"), idempotentRepo).to("mock:out");

您必须定义存储库应保存每个消息 id 的时长(默认为从中删除它)。为了避免内存不足,您应该根据 Hazelcast 配置创建 驱除策略。如需更多信息,请参阅 Hazelcast

请参阅此链接:http://camel.apache.org/hazelcast-idempotent-repository-tutorial.html[Idempotent 仓库

教程] 了解如何使用 Apache Karaf 在两个集群节点上设置此类幂等存储库。

选项

Idempotent Consumer 具有以下选项:

选项

默认

描述

eager

true

Camel 2.0: Eager 控制 Camel 是否在处理交换之前或之后将消息添加到存储库中。如果在之前启用,Camel 将能够检测重复的消息,即使消息当前正在进行中。禁用 Camel 只会在成功处理消息时检测重复。

messageIdRepositoryRef

null

对要在 registry 中查找的 IdempotentRepository 的引用。使用 XML DSL 时此选项是必须的。

skipDuplicate

true

Camel 2.8: 设置是否跳过重复的消息。如果设置为 false,则消息将继续。但是,通过将 Exchange.DUPLICATE_MESSAG Exchange 属性设置为 Boolean.TRUE 值,将 “Exchanges”一节 标记为重复。

completionEager

false

Camel 2.16 : 设置在交换完成后是否要完成 Idempotent 消费者 eager。

如果您设置了 completeEager 选项 true,则当交换到达幂等消费者模式块的末尾时,Idempotent Consumer 会触发其完成。但是,如果交换在结束块后继续路由,则它不会影响幂等消费者的状态。

如果您设置了 completeEager 选项 false,则 Idempotent Consumer 会在交换完成后触发其完成,并被路由。但是,如果交换在块结束时继续路由,则它也会影响幂等消费者的状态。例如,由于交换失败而异常,幂等消费者的状态将是回滚。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.