1.5. 处理器
概述
要让路由器执行比简单地将消费者端点连接到制作者端点更有趣的事情,您可以将 处理器 添加到路由中。处理器是一个命令,您可以插入到路由规则中,以执行任何通过规则流的消息处理。Apache Camel 提供各种处理器,如 表 1.1 “Apache Camel Processors” 所示。
Java DSL | XML DSL | 描述 |
---|---|---|
|
| 第 8.5 节 “聚合器”:创建一个聚合器,将多个传入的交换组合到一个交换中。 |
|
| 使用 Aspect Oriented 编程(AOP)在指定的子路由之前和之后工作。 |
|
| 通过调用 Java 对象(或 bean)的方法来处理当前的交换。请参阅 第 2.4 节 “bean Integration”。 |
|
|
第 8.1 节 “基于内容的路由器”:使用 |
|
| 将 In 消息正文转换为指定的类型。 |
|
| 第 8.9 节 “Delayer”:将交换的传播延迟到路由后面的部分。 |
|
|
创建用于处理异常的 try/catch 块,使用 |
| N/A | 结束当前命令块。 |
|
| 第 10.1 节 “内容增强器”:将当前交换与指定 制作者 端点 URI 请求的数据相结合。 |
|
| 第 8.2 节 “Message Filter”:使用 predicate 表达式来过滤传入的交换。 |
|
| 第 11.8 节 “幂等的消费者”:实施策略以抑制重复的消息。 |
|
| 布尔选项可用于禁用特定路由节点上继承的错误处理程序(定义为 Java DSL 中的子词,以及 XML DSL 中的属性)。 |
|
| 将当前的交换的 MEP 设置为 InOnly (如果没有参数),或者将交换作为 InOnly 发送到指定的端点。 |
|
| 将当前的交换的 MEP 设置为 InOut (如果没有参数),或者将交换作为 InOut 发送到指定的端点。 |
|
| 第 8.10 节 “Load Balancer”:在一组端点上实施负载平衡。 |
|
| 将消息记录到控制台。 |
|
| 第 8.16 节 “loop”:重复将每个交换的后端到路由中的后方。 |
|
|
(事务) 只标记当前回滚的事务(不会引发异常)。在 XML DSL 中,此选项设置为 |
|
|
(事务) 如果之前已与这个线程关联一个或多个事务,然后暂停,这个命令只标记回滚的最新事务(不会引发异常)。在 XML DSL 中,此选项设置为 |
|
| 使用指定数据格式转换为低级或二进制格式,以准备通过特定传输协议发送。 |
|
| 第 8.13 节 “多播”:将当前交换多播到多个目的地,其中每个目标获得自己的交换副本。 |
|
|
定义在主路由完成后执行的子路由(以 |
|
|
定义在 Java DSL 中以 |
|
| 第 5.4 节 “pipes 和 Filters”:将交换发送到一系列端点,其中一个端点的输出成为下一个端点的输入。另请参阅 第 2.1 节 “Pipeline 处理”。 |
|
| 将策略应用到当前路由(目前仅用于事务性策略>_<),请参阅 Apache Karaf 事务指南。 |
|
| 第 10.1 节 “内容增强器”:将当前交换与从指定 消费者 端点 URI 轮询的数据合并。 |
|
| 在当前交换上执行自定义处理器。请参阅 “自定义处理器”一节 和 第 III 部分 “高级 Camel 编程”。 |
|
| 第 8.3 节 “接收者列表”:将交换发送到在运行时计算的接收者列表(例如,基于标头的内容)。 |
|
| 从交换的 In 消息中删除指定的标头。 |
|
|
从交换的 In 消息中删除与指定模式匹配的标头。这个模式可以具有形式的 |
|
| 从交换中删除指定的交换属性。 |
|
|
从交换中删除与指定模式匹配的属性。将以逗号分隔的 1 个或多个字符串列表作为参数。第一个字符串是 模式(请参阅上面的 |
|
| 第 8.6 节 “Resequencer”: 按指定比较主操作对传入的交换进行重新排序。支持 批处理 模式和 流模式。 |
|
| (事务) 只标记当前仅回滚的事务(默认情况下也增加一个异常)。请参阅 Apache Karaf 事务指南。 |
|
| 第 8.7 节 “路由 Slip”:根据从 slip 标头提取的端点 URI 列表,通过动态构建的 Pipeline 路由交换。 |
|
| 创建一个抽样节流,允许您从路由上的流量中提取交换示例。 |
|
| 设置交换的 消息正文。 |
|
| 将当前的交换的 MEP 设置为指定的值。请参阅 “消息交换模式”一节。 |
|
| 在交换的 In 消息中设置指定的标头。 |
|
| 在 Exchange's Out 消息中设置指定的标头。 |
|
| 设置指定的交换属性。 |
|
| 对 In 消息正文的内容进行排序(其中可以指定自定义比较器)。 |
|
| 第 8.4 节 “Splitter”:将当前交换分成一系列交换,其中每个分割交换包含原始消息正文的片段。 |
|
| 可停止路由当前交换,并将其标记为 completed。 |
|
| 创建线程池,以并发处理路由中的后一部分。 |
|
| 第 8.8 节 “Throttler”:将流率限制为指定级别(每秒更改)。 |
|
| 引发指定的 Java 异常。 |
|
| 将交换发送到一个或多个端点。请参阅 第 2.1 节 “Pipeline 处理”。 |
| N/A |
使用字符串格式将交换发送到端点。也就是说,端点 URI 字符串可以在 C |
|
| 创建包含路由后者一部分的 Spring 事务范围。请参阅 Apache Karaf 事务指南。 |
|
| 第 5.6 节 “message Translator”:将 In 消息标头复制到 Out 消息标头,并将 Out 消息正文设置为指定的值。 |
|
| 使用指定数据格式将 In 消息正文从低级或二进制格式转换为高级别格式。 |
|
|
使用 predicate 表达式来测试当前消息是否有效。如果该 predicate 返回 |
|
|
第 12.3 节 “wire Tap”:使用 |
一些示例处理器
要了解如何在路由中使用处理器,请查看以下示例:
Choice
choice()
处理器是一个条件语句,用于将传入的消息路由到替代制作者端点。每个替代制作者端点的前面为一个 when()
方法,该方法使用一个 predicate 参数。如果 predicate 为 true,则会选择以下目标,否则处理继续进行规则中的下一个 when()
方法。例如,以下 choice()
处理器将传入消息定向到 Target1、Target 2 或 Target3,具体取决于 Predicate1 和 Predicate2 的值:
from("SourceURL") .choice() .when(Predicate1).to("Target1") .when(Predicate2).to("Target2") .otherwise().to("Target3");
也可以在 Spring XML 中:
<camelContext id="buildSimpleRouteWithChoice" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="SourceURL"/> <choice> <when> <!-- First predicate --> <simple>header.foo = 'bar'</simple> <to uri="Target1"/> </when> <when> <!-- Second predicate --> <simple>header.foo = 'manchu'</simple> <to uri="Target2"/> </when> <otherwise> <to uri="Target3"/> </otherwise> </choice> </route> </camelContext>
在 Java DSL 中,有一个特殊情形,您可能需要使用 endChoice()
命令。某些标准 Apache Camel 处理器允许您使用特殊子使用指定额外参数,从而有效地打开额外的嵌套级别,这通常由 end()
命令终止。例如,您可以将负载均衡器子指定为 loadBalance().round SDin().to("mock:foo").to("
,它加载 balances 信息在 mock:bar
").end()模拟:foo
和模拟:bar 端点之间进行负载平衡。但是,如果负载均衡器条款嵌入到所选条件中,需要使用 endChoice()
命令终止子句,如下所示:
from("direct:start") .choice() .when(bodyAs(String.class).contains("Camel")) .loadBalance().roundRobin().to("mock:foo").to("mock:bar").endChoice() .otherwise() .to("mock:result");
Filter
filter()
处理器可用于防止不间断消息到达制作者端点。它取一个 predicate 参数:如果 predicate 为 true,则允许消息交换给制作者;如果 predicate 为 false,则会阻止消息交换。例如,以下过滤器将阻止消息交换,除非传入消息包含标题为 foo
,值设为 bar
:
from("SourceURL").filter(header("foo").isEqualTo("bar")).to("TargetURL");
也可以在 Spring XML 中:
<camelContext id="filterRoute" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="SourceURL"/> <filter> <simple>header.foo = 'bar'</simple> <to uri="TargetURL"/> </filter> </route> </camelContext>
Throttler
throttle()
处理器确保制作者端点不会超载。throttler 通过限制每秒可传递的消息数量来实现。如果传入的消息超过指定率,则节流器会在缓冲区中累积过量消息,并将它们更慢地传输到制作者端点。例如,要将吞吐量率限制为每秒 100 个消息,您可以定义以下规则:
from("SourceURL").throttle(100).to("TargetURL");
也可以在 Spring XML 中:
<camelContext id="throttleRoute" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="SourceURL"/> <throttle maximumRequestsPerPeriod="100" timePeriodMillis="1000"> <to uri="TargetURL"/> </throttle> </route> </camelContext>
自定义处理器
如果此处描述的标准处理器都不提供您需要的功能,您可以始终定义自己的自定义处理器。要创建自定义处理器,请定义一个实施 org.apache.camel.Processor
接口的类,并覆盖 process()
方法。以下自定义处理器 MyProcessor
会从传入信息中删除名为 foo
的标头:
例 1.3. 实施自定义处理器类
public class MyProcessor implements org.apache.camel.Processor { public void process(org.apache.camel.Exchange exchange) { inMessage = exchange.getIn(); if (inMessage != null) { inMessage.removeHeader("foo"); } } };
要将自定义处理器插入到路由器规则中,调用 process()
方法,它为将处理器插入到规则中提供了通用机制。例如,以下规则调用 例 1.3 “实施自定义处理器类” 中定义的处理器:
org.apache.camel.Processor myProc = new MyProcessor(); from("SourceURL").process(myProc).to("TargetURL");