1.5. 处理器
概述
为了使路由器能够比简单地将消费者端点连接到生成者端点来执行更有趣的操作,您可以在路由中添加 处理器。处理器是一个命令,您可以插入到路由规则中,以执行通过该规则流的任意消息。Apache Camel 提供各种不同的处理器,如 表 1.1 “Apache Camel Processors” 所示。
Java DSL | XML DSL | 描述 |
---|---|---|
|
| 第 8.5 节 “聚合器”: 创建一个聚合器,它将多个传入交换合并为一个交换。 |
|
| 使用 Aspect Oriented programming (AOP)在指定的子路由之前和之后工作。 |
|
| 通过调用 Java 对象(或 bean)的方法来处理当前的交换。请参阅 第 2.4 节 “Bean 集成”。 |
|
|
第 8.1 节 “基于内容的路由器”: 使用 |
|
| 将 In 消息正文转换为指定类型。 |
|
| 第 8.9 节 “Delayer”: 将交换传播到路由的后部分。 |
|
|
创建一个用于处理异常的 try/catch 块,使用 |
| 不适用 | 结束当前命令块。 |
|
| 第 10.1 节 “内容增强”: 将当前交换与从指定 制作者 端点 URI 请求的数据合并。 |
|
| 第 8.2 节 “消息过滤器”: 使用 predicate 表达式来过滤传入的交换。 |
|
| 第 11.8 节 “idempotent Consumer”: 实施策略以抑制重复消息。 |
|
| 布尔值选项,可用于禁用特定路由节点上继承的错误处理程序(定义为 Java DSL 中的子clause,以及 XML DSL 中的属性)。 |
|
| 将当前交换的 MEP 设置为 InOnly (如果没有参数),或者将交换作为 InOnly 发送到指定的端点。 |
|
| 将当前交换的 MEP 设置为 InOut (如果没有参数),或者将交换作为 InOut 发送到指定的端点。 |
|
| 第 8.10 节 “Load Balancer”: 通过一组端点实施负载平衡。 |
|
| 将消息记录到控制台。 |
|
| 第 8.16 节 “loop”: 重复将每个交换重新发送到路由的后部分。 |
|
|
(事务处理) 仅标记当前回滚的事务(不会引发异常)。在 XML DSL 中,此选项被设置为 |
|
|
(事务处理) 如果之前与此线程关联了一个或多个事务,这个命令会暂停,这个命令会标记最新的回滚事务(不会引发异常)。在 XML DSL 中,此选项被设置为 |
|
| 使用指定的数据格式转换为低级或二进制格式,以准备通过特定的传输协议发送。 |
|
| 第 8.13 节 “多播”: 将当前交换广播到多个目的地,每个目的地获得自己的交换副本。 |
|
|
定义在主路由完成后执行的子路由(由 Java DSL 中的 |
|
|
定义在发生指定异常时执行的子路由(由 Java DSL 中的 |
|
| 第 5.4 节 “管道和过滤器”: 将交换发送到一系列端点,其中一个端点的输出成为下一个端点的输入。另请参阅 第 2.1 节 “Pipeline 处理”。 |
|
| 将策略应用到当前路由(目前仅用于事务策略)see Apache Karaf 事务指南。 |
|
| 第 10.1 节 “内容增强”: 将当前交换与从指定 消费者 端点 URI 轮询的数据合并。 |
|
| 在当前交换上执行自定义处理器。请参阅 “自定义处理器”一节 和 第 III 部分 “高级 Camel 编程”。 |
|
| 第 8.3 节 “接收者列表”: 将交换发送到在运行时计算的接收方列表(例如,基于标头的内容)。 |
|
| 从交换的 In 消息中删除指定的标头。 |
|
|
从交换的 In 消息中删除与指定模式匹配的标头。模式可以有形式, |
|
| 从交换中删除指定的交换属性。 |
|
|
从交换中删除与指定模式匹配的属性。将以逗号分隔的 1 字符串列表作为参数。第一个字符串是模式(请参阅上面的 |
|
| 第 8.6 节 “Resequencer”: 根据指定比较器操作对传入的交换重新排序。支持 批处理模式 和 流模式。 |
|
| (事务处理) 仅标记当前回滚的事务(在默认情况下也会增加一个例外)。请参阅 Apache Karaf 事务指南。 |
|
| 第 8.7 节 “路由 Slip”: 根据从 slip 标头中提取的端点 URI 列表,通过管道动态地路由交换。 |
|
| 创建一个抽样节流,允许您从路由上的流量提取交换示例。 |
|
| 设置交换的 In 消息的消息正文。 |
|
| 将当前交换的 MEP 设置为指定的值。请参阅 “消息交换模式”一节。 |
|
| 在交换的 In 消息中设置指定的标头。 |
|
| 在交换的 Out 消息中设置指定的标头。 |
|
| 设置指定的交换属性。 |
|
| 对 In 消息正文的内容进行排序(可以指定自定义比较器)。 |
|
| 第 8.4 节 “Splitter”: 将当前交换分成一系列交换,其中每个分割交换包含原始消息正文的片段。 |
|
| 停止路由当前交换并将其标记为 completed。 |
|
| 创建一个线程池来并发处理路由的后部分。 |
|
| 第 8.8 节 “Throttler”: 将流率限制为指定的级别(每秒交换/秒)。 |
|
| 引发指定的 Java 异常。 |
|
| 将交换发送到一个或多个端点。请参阅 第 2.1 节 “Pipeline 处理”。 |
| 不适用 |
使用字符串格式将交换发送到端点。也就是说,端点 URI 字符串可以将替换嵌入到 C |
|
| 创建一个 Spring 事务范围,它将包括路由的后部分。请参阅 Apache Karaf 事务指南。 |
|
| 第 5.6 节 “message Translator”: 将 In 消息标头复制到 Out 消息标头,并将 Out 消息正文设置为指定的值。 |
|
| 使用指定的数据格式,将 In 消息正文从低级或二进制格式转换为高级别格式。 |
|
|
使用 predicate 表达式来测试当前消息是否有效。如果 predicate 返回 |
|
|
第 12.3 节 “wire Tap”: 使用 |
一些处理器示例
要了解如何在路由中使用处理器的一些想法,请参见以下示例:
选择
choice ()
处理器是一个条件语句,用于将传入的消息路由到替代制作者端点。每个替代制作者端点都以 when ()
方法开头,该方法采用 predicate 参数。如果 predicate 为 true,则会选择以下目标,否则处理会进入规则中的下一个 when ()
方法。例如,以下 choice ()
处理器将传入的消息定向到 Target1、 Target2 或 Target3,具体取决于 Predicate1 和 Predicate2 的值:
from("SourceURL") .choice() .when(Predicate1).to("Target1") .when(Predicate2).to("Target2") .otherwise().to("Target3");
或者等同于 Spring XML:
<camelContext id="buildSimpleRouteWithChoice" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="SourceURL"/> <choice> <when> <!-- First predicate --> <simple>header.foo = 'bar'</simple> <to uri="Target1"/> </when> <when> <!-- Second predicate --> <simple>header.foo = 'manchu'</simple> <to uri="Target2"/> </when> <otherwise> <to uri="Target3"/> </otherwise> </choice> </route> </camelContext>
在 Java DSL 中,您可能需要使用 endChoice ()
命令。某些标准 Apache Camel 处理器允许您使用特殊的子库指定额外的参数,从而有效地打开额外的嵌套级别,该级别通常由 end ()
命令终止。例如,您可以将负载均衡器条款指定为 loadBalance ().roundRobin ().to ("
,它会在 mock:foo 和 mock:foo
").to ("mock:bar").end ()mock:bar
端点之间负载均衡消息。但是,如果负载均衡器句嵌入到选择条件中,则需要使用 endChoice ()
命令终止 子句,如下所示:
from("direct:start") .choice() .when(bodyAs(String.class).contains("Camel")) .loadBalance().roundRobin().to("mock:foo").to("mock:bar").endChoice() .otherwise() .to("mock:result");
Filter
filter ()
处理器可用于防止未中断消息到达制作者端点。它采用单个 predicate 参数:如果 predicate 为 true,则允许消息交换到制作者;如果 predicate 为 false,则消息交换会被阻止。例如,以下过滤器会阻止消息交换,除非传入的消息包含标头 foo
,其值等于 bar
:
from("SourceURL").filter(header("foo").isEqualTo("bar")).to("TargetURL");
或者等同于 Spring XML:
<camelContext id="filterRoute" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="SourceURL"/> <filter> <simple>header.foo = 'bar'</simple> <to uri="TargetURL"/> </filter> </route> </camelContext>
Throttler
throttle ()
处理器确保生成者端点不会超载。throttler 的工作原理是通过限制每秒可传递的消息数量。如果传入的消息超过指定率,throttler 会在缓冲区中累积过量消息,并将其传输成生成者端点的速度。例如,要将吞吐量率限制为每秒 100 个消息,您可以定义以下规则:
from("SourceURL").throttle(100).to("TargetURL");
或者等同于 Spring XML:
<camelContext id="throttleRoute" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="SourceURL"/> <throttle maximumRequestsPerPeriod="100" timePeriodMillis="1000"> <to uri="TargetURL"/> </throttle> </route> </camelContext>
自定义处理器
如果此处未描述的标准处理器都提供您需要的功能,则始终可以自行定义自定义处理器。要创建自定义处理器,请定义一个实施 org.apache.camel.Processor
接口的类,并覆盖 process ()
方法。以下自定义处理器 MyProcessor
会从传入的消息中删除名为 foo
的标头:
例 1.3. 实施自定义处理器类
public class MyProcessor implements org.apache.camel.Processor { public void process(org.apache.camel.Exchange exchange) { inMessage = exchange.getIn(); if (inMessage != null) { inMessage.removeHeader("foo"); } } };
要将自定义处理器插入到路由器规则中,可调用 process ()
方法,此方法提供了将处理器插入到规则的通用机制。例如,以下规则调用 例 1.3 “实施自定义处理器类” 中定义的处理器:
org.apache.camel.Processor myProc = new MyProcessor(); from("SourceURL").process(myProc).to("TargetURL");