1.5. Processors
概述
要让路由器执行更有趣的事情,而只是将消费者端点连接到制作者端点,您可以将 处理器 添加到您的路由中。处理器是您可以插入到路由规则中的命令,以执行任意处理通过该规则的消息。Apache Camel 提供了各种不同的处理器,如 表 1.1 “Apache Camel Processors” 所示。
Java DSL | XML DSL | 描述 |
---|---|---|
|
| 第 8.5 节 “聚合器”: 创建一个聚合器,它将多个传入的交换合并到一个交换中。 |
|
| 使用 Aspect Oriented programming (AOP)在指定的子路由之前和之后工作。 |
|
| 通过调用 Java 对象(或 bean)的方法来处理当前的交换。请参阅 第 2.4 节 “Bean 集成”。 |
|
|
第 8.1 节 “基于内容的路由器”: 使用 |
|
| 将 In message body 转换为指定的类型。 |
|
| 第 8.9 节 “Delayer”: 延迟交换传播到路由部分的传播。 |
|
|
使用 |
| N/A | 结束当前命令块。 |
|
| 第 10.1 节 “内容增强”: 将当前交换与指定 制作者 端点 URI 请求的数据相结合。 |
|
| 第 8.2 节 “Message Filter”: 使用 predicate 表达式来过滤传入的交换。 |
|
| 第 11.8 节 “idempotent Consumer”: 实施一个策略来阻止重复的消息。 |
|
| 布尔值选项,可用于禁用特定路由节点上的继承错误处理程序(定义为 Java DSL 中的子层,以及 XML DSL 中的属性)。 |
|
| 将当前的交换的 MEP 设置为 InOnly (如果没有参数),或者将交换作为 InOnly 发送到指定的端点。 |
|
| 将当前的交换的 MEP 设置为 InOut (如果没有参数),或者将交换作为 InOut 发送到指定的端点。 |
|
| 第 8.10 节 “Load Balancer”: 通过一系列端点实施负载平衡。 |
|
| 将消息记录到控制台。 |
|
| 第 8.16 节 “loop”: 重复发送每个交换到路由的后一种部分。 |
|
|
(事务处理) 仅标记当前用于回滚的事务(不会引发异常)。在 XML DSL 中,这个选项被设置为 |
|
|
(事务) 如果之前与此线程关联了一个或多个事务,然后暂停,这个命令会标记最新的事务只进行回滚(不会引发异常)。在 XML DSL 中,这个选项被设置为 |
|
| 使用指定的数据格式转换为低级或二进制格式,以准备通过特定的传输协议发送。 |
|
| 第 8.13 节 “多播”: 多播当前交换到多个目的地,每个目标都会获得其自身的交换副本。 |
|
|
定义在主路由完成后执行的子路由(由 Java DSL 中的 |
|
|
定义在指定异常发生时执行的子路由(由 Java DSL 中的 |
|
| 第 5.4 节 “管道和过滤器”: 将交换发送到一系列端点,其中一个端点的输出成为下一个端点的输入。另请参阅 第 2.1 节 “Pipeline 处理”。 |
|
| 对当前路由应用策略(目前仅适用于事务策略),请参阅 Apache Karaf 事务指南。 |
|
| 第 10.1 节 “内容增强”: 将当前交换与从指定的 消费者 端点 URI 轮询的数据合并。 |
|
| 在当前交换上执行自定义处理器。请参阅 “自定义处理器”一节 和 第 III 部分 “高级 Camel 编程”。 |
|
| 第 8.3 节 “接收者列表”: 将交换发送到在运行时计算的接收方列表(例如,基于标头的内容)。 |
|
| 从交换的 In 消息中删除指定的标头。 |
|
|
从交换的 In 消息中删除与指定模式匹配的标头。模式可以具有表单 |
|
| 从交换中删除指定的交换属性。 |
|
|
从交换中删除与指定模式匹配的属性。将以逗号分隔的 1 或更多字符串列表作为参数。第一个字符串是模式(请参阅上面的 |
|
| 第 8.6 节 “Resequencer”: 根据指定的 comparotor 操作对传入的交换进行排序。支持 批处理模式 和 流模式。 |
|
| (事务处理) 仅标记当前用于回滚的事务(默认为异常)。请参阅 Apache Karaf 交易指南。 |
|
| 第 8.7 节 “路由片段”: 根据从 slip 标头中提取的端点 URI 列表,通过动态构建的管道路由交换。 |
|
| 创建抽样节流,允许您从路由上的流量提取交换示例。 |
|
| 设置交换的 In 消息的消息正文。 |
|
| 将当前的交换的 MEP 设置为指定的值。请参阅 “消息交换模式”一节。 |
|
| 在交换的 In 消息中设置指定的标头。 |
|
| 在交换的 Out 消息中设置指定的标头。 |
|
| 设置指定的交换属性。 |
|
| 对 In 消息正文的内容排序(可以选择性地指定自定义比较器的位置)。 |
|
| 第 8.4 节 “Splitter”: 将当前交换分成一系列交换,每个分割交换都包含原始消息正文的片段。 |
|
| 停止路由当前交换并将其标记为 completed。 |
|
| 创建一个线程池,用于并发处理路由部分。 |
|
| 第 8.8 节 “Throttler”: 将流率限制为指定的级别(每秒交换数)。 |
|
| 抛出指定的 Java 异常。 |
|
| 将交换发送到一个或多个端点。请参阅 第 2.1 节 “Pipeline 处理”。 |
| N/A |
使用字符串格式化将交换发送到端点。也就是说,端点 URI 字符串可以嵌入 C |
|
| 创建一个 Spring 事务范围,其中包含路由的后者部分。请参阅 Apache Karaf 交易指南。 |
|
| 第 5.6 节 “消息 Translator”: 将 In message 标头复制到 Out message 标头,并将 Out message body 设置为指定的值。 |
|
| 使用指定的数据格式,将 In 消息正文从低级或二进制格式转换为高级别的格式。 |
|
|
取一个 predicate 表达式来测试当前消息是否有效。如果 predicate 返回 |
|
|
第 12.3 节 “wire Tap”: 使用 |
一些处理器示例
要了解如何在路由中使用处理器,请参阅以下示例:
choice
choice ()
处理器是一个条件语句,用于将传入消息路由到替代制作者端点。每个替代制作者端点都以 when ()
方法开头,此方法采用 predicate 参数。如果 predicate 为 true,则会选择以下目标,否则处理将继续执行规则中的下一个 when ()
方法。例如,以下 choice ()
处理器将传入消息定向到 Target1、 Target2 或 Target3,具体取决于 Predicate1 和 Predicate2 的值:
from("SourceURL") .choice() .when(Predicate1).to("Target1") .when(Predicate2).to("Target2") .otherwise().to("Target3");
或者等效在 Spring XML 中:
<camelContext id="buildSimpleRouteWithChoice" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="SourceURL"/> <choice> <when> <!-- First predicate --> <simple>header.foo = 'bar'</simple> <to uri="Target1"/> </when> <when> <!-- Second predicate --> <simple>header.foo = 'manchu'</simple> <to uri="Target2"/> </when> <otherwise> <to uri="Target3"/> </otherwise> </choice> </route> </camelContext>
在 Java DSL 中,有一个特殊情况,您可能需要使用 endChoice ()
命令。某些标准的 Apache Camel 处理器允许您使用特殊的子层来指定额外的参数,从而有效地打开额外的嵌套级别,这通常由 end ()
命令终止。例如,您可以将负载均衡器子句指定为 loadBalance ().roundRobin ().to ("
,它会在 mock:foo 和 mock:foo
").to ("mock:bar").end ()mock:bar
端点之间平衡消息。如果负载均衡器子句嵌入在选择条件中,需要使用 endChoice ()
命令来终止子句,如下所示:
from("direct:start") .choice() .when(bodyAs(String.class).contains("Camel")) .loadBalance().roundRobin().to("mock:foo").to("mock:bar").endChoice() .otherwise() .to("mock:result");
Filter
filter ()
处理器可用于防止不常见的消息到达制作者端点。它接受单个 predicate 参数:如果 predicate 为 true,则允许消息交换到制作者;如果 predicate 为 false,则消息交换会被阻断。例如,以下过滤器会阻止消息交换,除非传入消息包含一个标题 foo
,值为 bar
:
from("SourceURL").filter(header("foo").isEqualTo("bar")).to("TargetURL");
或者等效在 Spring XML 中:
<camelContext id="filterRoute" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="SourceURL"/> <filter> <simple>header.foo = 'bar'</simple> <to uri="TargetURL"/> </filter> </route> </camelContext>
Throttler
throttle ()
处理器确保制作者端点不会超载。throttler 的工作原理为:限制每秒可以传递的消息数量。如果传入的消息超过指定的速率,throttler 会在缓冲区中累积超出消息,并将其传送到制作者端点。例如,要将吞吐量限制为每秒 100 个信息,您可以定义以下规则:
from("SourceURL").throttle(100).to("TargetURL");
或者等效在 Spring XML 中:
<camelContext id="throttleRoute" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="SourceURL"/> <throttle maximumRequestsPerPeriod="100" timePeriodMillis="1000"> <to uri="TargetURL"/> </throttle> </route> </camelContext>
自定义处理器
如果这里描述的标准处理器都没有提供您需要的功能,则始终可以定义自己的自定义处理器。要创建自定义处理器,请定义一个实施 org.apache.camel.Processor
接口的类,并覆盖 process ()
方法。以下自定义处理器 MyProcessor
会从传入消息中删除名为 foo
的标头:
例 1.3. 实施自定义处理器类
public class MyProcessor implements org.apache.camel.Processor { public void process(org.apache.camel.Exchange exchange) { inMessage = exchange.getIn(); if (inMessage != null) { inMessage.removeHeader("foo"); } } };
要将自定义处理器插入到路由器规则中,调用 process ()
方法,该方法为将处理器插入规则提供了通用机制。例如,以下规则调用 例 1.3 “实施自定义处理器类” 中定义的处理器:
org.apache.camel.Processor myProc = new MyProcessor(); from("SourceURL").process(myProc).to("TargetURL");