8.6. Resequencer
概述
resequencer 模式(如 图 8.7 “重新排序模式” )可让您根据排序表达式重新排序信息。为 sequencing 表达式生成低值的消息将移到批处理前面,并且生成高值的消息将移到后端。
图 8.7. 重新排序模式
Apache Camel 支持两种重新排序算法:
- 批处理重新排序 Brightcove-IMG 收集消息到批处理中,对消息进行排序,并将其发送到其输出。
- 根据消息间存在空白的 流,流重新排序 Brightcove- insufficient-orders (持续)消息流。
默认情况下,resequencer 不支持重复消息,只有在消息到达同一邮件表达式时,才会保留最后一条消息。但是,在批处理模式中,您可以启用 resequencer 来允许重复。
批量排序
默认情况下启用批处理重新排序算法。例如,若要根据 TimeStamp
标头中包含的时间戳值重新排序传入的消息,您可以在 Java DSL 中定义以下路由:
from("direct:start").resequence(header("TimeStamp")).to("mock:result");
默认情况下,批处理通过收集达到 1000 毫秒的时间间隔(默认 批处理超时)的所有传入消息来获取,最高为 100 个消息(默认 批处理大小)。您可以通过附加 batch ()
DSL 命令自定义批处理超时和批处理大小的值,该命令将 BatchResequencerConfig
实例用作其唯一参数。例如,要修改上述路由,使批处理由在 4000 毫秒内收集的消息组成,最多 300 个消息,您可以按照如下所示定义 Java DSL 路由:
import org.apache.camel.model.config.BatchResequencerConfig; RouteBuilder builder = new RouteBuilder() { public void configure() { from("direct:start").resequence(header("TimeStamp")).batch(new BatchResequencerConfig(300,4000L)).to("mock:result"); } };
您还可以使用 XML 配置指定批处理重新排序模式。以下示例定义了批处理大小为 300 的批处理重新排序,批处理超时为 4000 毫秒:
<camelContext id="resequencerBatch" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start" /> <resequence> <!-- batch-config can be omitted for default (batch) resequencer settings --> <batch-config batchSize="300" batchTimeout="4000" /> <simple>header.TimeStamp</simple> <to uri="mock:result" /> </resequence> </route> </camelContext>
批处理选项
表 8.2 “批量排序选项” 仅显示批处理模式中可用的选项。
Java DSL | XML DSL | 默认 | 描述 |
---|---|---|---|
|
|
|
如果为 |
|
|
|
如果为 |
例如,如果要根据 JMSPriority
重新排序来自 JMS 队列的消息,则需要组合选项、allowDuplicates
和 reverse
,如下所示:
from("jms:queue:foo") // sort by JMSPriority by allowing duplicates (message can have same JMSPriority) // and use reverse ordering so 9 is first output (most important), and 0 is last // use batch mode and fire every 3th second .resequence(header("JMSPriority")).batch().timeout(3000).allowDuplicates().reverse() .to("mock:result");
流重新排序
要启用流重新排序算法,您必须将 stream ()
附加到 resequence ()
DSL 命令。例如,要根据 seqnum
标头的序列号值重新排序传入的信息,您需要定义一个 DSL 路由,如下所示:
from("direct:start").resequence(header("seqnum")).stream().to("mock:result");
流处理重新排序器算法基于消息流中差距的检测,而不是针对固定批处理大小。差距检测与超时结合使用,消除了需要预先了解序列的消息数(即批处理大小)的约束。消息必须包含已知前身和后续的唯一序列号。例如,序列号为 3
的消息带有数字 2
的前身消息,以及序列号为 4
的 successor 消息。消息序列 2,3,5
存在差距,因为后续 3
缺失。因此,重新排序必须保留信息 5
,直到消息 4
到达(或发生超时)。
默认情况下,流重新排序器配置超时为 1000 毫秒,最大消息容量为 100。要自定义流的超时和消息容量,您可以将 StreamResequencerConfig
对象作为参数传递到 stream ()
。例如,若要为配置流重新排序器,消息容量为 5000,超时为 4000 毫秒,您可以按照如下所示定义路由:
// Java import org.apache.camel.model.config.StreamResequencerConfig; RouteBuilder builder = new RouteBuilder() { public void configure() { from("direct:start").resequence(header("seqnum")). stream(new StreamResequencerConfig(5000, 4000L)). to("mock:result"); } };
如果连续消息(即,带有相邻序列号的消息)在消息流中已知的最大时间延迟,则 resequencer 的 timeout 参数应设置为该值。在这种情况下,您可以保证流中的所有消息都以正确顺序发送到下一个处理器。与时间不足相比的超时值越低,重新排序率越高,重新排序器会从序列中传送消息。大型超时值应该完全高容量值,其中使用 capacity 参数来防止重新排序的内存耗尽。
如果要使用 长
以外的某些类型的序列号,您必须定义一个自定义比较器,如下所示:
// Java ExpressionResultComparator<Exchange> comparator = new MyComparator(); StreamResequencerConfig config = new StreamResequencerConfig(5000, 4000L, comparator); from("direct:start").resequence(header("seqnum")).stream(config).to("mock:result");
您还可以使用 XML 配置指定流排序模式。以下示例定义了消息容量为 5000 且超时为 4000 毫秒的流重新排序:
<camelContext id="resequencerStream" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <resequence> <stream-config capacity="5000" timeout="4000"/> <simple>header.seqnum</simple> <to uri="mock:result" /> </resequence> </route> </camelContext>
忽略无效的交换
如果传入交换不是有效的 GROUPS,则重新排序 EIP 会抛出 CamelExchangeException
异常。也就是说,如果出于某种原因无法评估 sequencing 表达式(例如,因为缺少标头)。您可以使用 ignoreInvalidExchanges
选项忽略这些异常,这意味着重新排序程序将跳过任何无效的交换。
from("direct:start")
.resequence(header("seqno")).batch().timeout(1000)
// ignore invalid exchanges (they are discarded)
.ignoreInvalidExchanges()
.to("mock:result");
拒绝旧信息
rejectOld
选项可用于防止按顺序发送消息,无论用于重新排序消息的机制。启用 rejectOld
选项后,重新排序符拒绝传入的消息(通过引发 MessageRejectedException
异常),如果传入的消息是 旧的 (由当前比较器定义)与最后传送的消息一样。
from("direct:start")
.onException(MessageRejectedException.class).handled(true).to("mock:error").end()
.resequence(header("seqno")).stream().timeout(1000).rejectOld()
.to("mock:result");