搜索

8.6. Resequencer

download PDF

概述

图 8.7 “重新排序器模式” 中显示的 重新排序 器模式允许您根据顺序表达式重新排序信息。为 sequencing 表达式生成低的值的消息被移到批处理的前面,生成高值的消息将移到回来。

图 8.7. 重新排序器模式

重新排序器模式

Apache Camel 支持两个重新排序算法:

  • 批量重新排序 swig -wagon Collects 消息到批处理中,对消息进行排序并将其发送到其输出。
  • 根据消息间差距的检测,流重新排序 datas (continuous)消息流。

默认情况下,resequencer 不支持重复消息,并将只保留最后一条消息,当消息到达相同的消息表达式时。但是,在批处理模式中,您可以启用 resequencer 来允许重复。

批量重新排序

批处理重新排序算法默认为启用。例如,要根据 TimeStamp 标头中包含的时间戳值重新排序传入的消息,您可以在 Java DSL 中定义以下路由:

from("direct:start").resequence(header("TimeStamp")).to("mock:result");

默认情况下,通过收集到达间隔为 1000 毫秒的所有传入消息(默认 批处理超时),最多获取 100 个消息(默认 批处理大小)。您可以通过附加 batch () DSL 命令来自定义批处理超时和批处理大小,该命令使用 BatchResequencerConfig 实例作为其唯一参数。例如,要修改前面的路由,以便批处理由 4000 毫秒时间窗内收集的消息组成,最多 300 个消息,您可以定义 Java DSL 路由,如下所示:

import org.apache.camel.model.config.BatchResequencerConfig;

RouteBuilder builder = new RouteBuilder() {
    public void configure() {
        from("direct:start").resequence(header("TimeStamp")).batch(new BatchResequencerConfig(300,4000L)).to("mock:result");
    }
};

您还可以使用 XML 配置指定批处理重新排序器模式。以下示例定义了一个批处理重新排序器,批处理大小为 300,批处理超时为 4000 毫秒:

<camelContext id="resequencerBatch" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="direct:start" />
    <resequence>
      <!--
        batch-config can be omitted for default (batch) resequencer settings
      -->
      <batch-config batchSize="300" batchTimeout="4000" />
      <simple>header.TimeStamp</simple>
      <to uri="mock:result" />
    </resequence>
  </route>
</camelContext>

批处理选项

表 8.2 “批处理重新排序器选项” 仅显示批处理模式中可用的选项。

表 8.2. 批处理重新排序器选项
Java DSLXML DSLdefault描述

allowDuplicates ()

batch-config/@allowDuplicates

false

如果为 true,请不要丢弃来自批处理的重复消息(其中 重复 意味着消息表达式评估为同一值)。

reverse()

batch-config/@reverse

false

如果为 true,以相反的顺序放置消息(应用于消息表达式的默认排序基于 Java 的字符串字典顺序,如 String.compareTo ()定义)。

例如,如果要根据 JMSPriority 从 JMS 队列重新排序消息,则需要组合选项,允许Duplicates反向,如下所示:

from("jms:queue:foo")
        // sort by JMSPriority by allowing duplicates (message can have same JMSPriority)
        // and use reverse ordering so 9 is first output (most important), and 0 is last
        // use batch mode and fire every 3th second
        .resequence(header("JMSPriority")).batch().timeout(3000).allowDuplicates().reverse()
        .to("mock:result");

流重新排序

要启用流重新排序算法,您必须将 stream () 附加到 resequence () DSL 命令中。例如,要根据 seqnum 标头中的序列号值重新排序传入的信息,您可以定义一个 DSL 路由,如下所示:

from("direct:start").resequence(header("seqnum")).stream().to("mock:result");

流处理重新排序器算法基于消息流中的差距检测,而不是对固定批处理的大小。差距检测与超时相结合,不再需要提前知道序列(即批处理大小)的消息数量。消息必须包含一个唯一的序列号,即前导者和成功符。例如,带有序列号 3 的消息带有序列号 2 的前身消息,以及序列号为 4 的后续消息。消息序列 2,3,5 存在一个差距,因为缺少 3 的后续。因此,重新排序必须保留消息 5,直到消息 4 到达(或超时发生)。

默认情况下,流重新排序器配置为 1000 毫秒的超时,最大消息容量为 100。要自定义流的超时和消息容量,您可以传递 StreamResequencerConfig 对象作为 stream () 的参数。例如,要配置一个流重新排序器,消息容量为 5000,超时为 4000 毫秒,您需要定义一个路由,如下所示:

// Java
import org.apache.camel.model.config.StreamResequencerConfig;

RouteBuilder builder = new RouteBuilder() {
    public void configure() {
        from("direct:start").resequence(header("seqnum")).
            stream(new StreamResequencerConfig(5000, 4000L)).
            to("mock:result");
    }
};

如果消息流中连续消息(即带有相邻序列号的消息)之间的最大时间延迟已知,则 resequencer 的 timeout 参数应设置为这个值。在这种情况下,您可以保证流中的所有消息都会以正确的顺序发送到下一个处理器。与顺序时间差异相比的超时值较低,而重新排序器可能会从序列中传递信息。大型超时值应该获得足够的高容量值,其中使用 capacity 参数来防止 resequencer 耗尽内存。

如果要使用 以外的某些类型的序列号,您必须定义自定义比较器,如下所示:

// Java
ExpressionResultComparator<Exchange> comparator = new MyComparator();
StreamResequencerConfig config = new StreamResequencerConfig(5000, 4000L, comparator);
from("direct:start").resequence(header("seqnum")).stream(config).to("mock:result");

您还可以使用 XML 配置指定流重新排序器模式。以下示例定义了一个流重新排序器,消息容量为 5000,超时为 4000 毫秒:

<camelContext id="resequencerStream" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="direct:start"/>
    <resequence>
      <stream-config capacity="5000" timeout="4000"/>
      <simple>header.seqnum</simple>
      <to uri="mock:result" />
    </resequence>
  </route>
</camelContext>

忽略无效的交换

如果传入的交换不是有效的5-4会(例如,由于缺少标头,则重新排序表达式)会抛出 CamelExchangeException 异常(例如,由于缺少标头),则重新排序表达式。您可以使用 ignoreInvalidExchanges 选项忽略这些例外,这意味着 resequencer 将跳过任何无效的交换。

from("direct:start")
  .resequence(header("seqno")).batch().timeout(1000)
    // ignore invalid exchanges (they are discarded)
    .ignoreInvalidExchanges()
  .to("mock:result");

拒绝旧消息

rejectOld 选项可用于防止消息按顺序发送,无论用于重新排序消息的机制是什么。启用 rejectOld 选项后,resequencer 将拒绝传入消息(通过抛出 MessageRejectedException 异常),如果传入的消息比最近发送的消息 (如当前比较器定义)。

from("direct:start")
    .onException(MessageRejectedException.class).handled(true).to("mock:error").end()
    .resequence(header("seqno")).stream().timeout(1000).rejectOld()
    .to("mock:result");
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.