1.5. 处理器
概述
要让路由器做比将消费者端点连接到制作者端点相比,您可以向路由添加 处理器 更为有趣。处理器是一个命令,您可以插入到路由规则中,以执行通过该规则执行流的任意处理。Apache Camel 提供各种不同处理器,如 表 1.1 “Apache Camel Processors” 所示。
Java DSL | XML DSL | 描述 |
---|---|---|
|
| 第 8.5 节 “聚合器”: 创建一个聚合器,它将多个传入交换组合成一个交换。 |
|
| 使用 Aspect Oriented Programming (AOP)在指定子路由之前和之后工作。 |
|
| 通过调用 Java 对象(或 bean)的方法来处理当前的交换。请参阅 第 2.4 节 “Bean 集成”。 |
|
|
第 8.1 节 “基于内容的路由器”:使用 when 和 |
|
| 将 In 消息正文转换为指定的类型。 |
|
| 第 8.9 节 “Delayer”: 将交换的传播延迟到路由的后部分。 |
|
|
创建用于处理异常的尝试/分散块,使用 |
| N/A | 结束当前命令块。 |
|
| 第 10.1 节 “内容增强”:将当前的交换与指定 制作者 端点 URI 请求的数据合并。 |
|
| 第 8.2 节 “消息过滤器”: 使用 predicate 表达式来过滤传入的交换。 |
|
| 第 11.8 节 “幂等的消费者”:实施策略以抑制重复消息。 |
|
| 布尔值选项,可用于在特定路由节点上禁用继承的错误处理程序(定义为 Java DSL 中的子使用和 XML DSL 中的属性)。 |
|
| 将当前交换的 MEP 设置为 仅限 (如果没有参数),或者将交换作为 仅限 给指定的端点。 |
|
| 将当前交换的 MEP 设置为 InOut (如果没有参数),或者将交换作为 InOut 发送到指定的端点。 |
|
| 第 8.10 节 “Load Balancer”:对端点集合实施负载平衡。 |
|
| 将消息记录到控制台。 |
|
| 第 8.16 节 “循环”:重复将每个交换重新放置到路由的后部分。 |
|
|
(事务) 标记仅回滚的当前事务(不会引发异常)。在 XML DSL 中,此选项被设置为 |
|
|
(事务) 如果之前已与该线程关联了一个或多个事务,然后暂停,这个命令会标记仅回滚的最新事务(不会引发异常)。在 XML DSL 中,此选项被设置为 |
|
| 使用指定的数据格式转换为低级或二进制格式,以准备通过特定的传输协议发送。 |
|
| 第 8.13 节 “多播”:将当前交换到多个目的地,每个目的地获得自己的交换副本。 |
|
|
定义一个子路由(以 Java DSL |
|
|
定义 Java DSL 中的 |
|
| 第 5.4 节 “管道和过滤器”: 将交换发送到一系列端点,其中一个端点的输出将成为下一个端点的输入。另请参阅 第 2.1 节 “Pipeline 处理”。 |
|
| 将策略应用到当前路由(目前仅用于事务策略事项处理器)中,请参阅 Apache Karaf 事务指南。 |
|
| 第 10.1 节 “内容增强”: 将当前交换与来自指定的 消费者 端点 URI 的数据进行合并。 |
|
| 在当前交换上执行自定义处理器。请参阅 “自定义处理器”一节 和 第 III 部分 “高级 Camel 编程”。 |
|
| 第 8.3 节 “接收者列表”: 将交换发送到运行时计算的接收者列表(例如,基于标头的内容)。 |
|
| 从交换的 In 消息中删除指定的标头。 |
|
|
从交换的 In 消息中删除与指定模式匹配的标头。这个模式可以采用 |
|
| 从交换删除指定的交换属性。 |
|
|
从交换中删除与指定模式匹配的属性。以逗号分隔的 1 或多个字符串列表作为参数。第一个字符串是模式(请参阅上面的 |
|
| 第 8.6 节 “Resequencer”: 根据指定比较器操作对传入交换进行重新排序。支持 批处理 模式和 流模式。 |
|
| (事务) 标记仅进行回滚的当前事务(此外,默认情况下也会增加异常)。请参阅 Apache Karaf 事务指南。 |
|
| 第 8.7 节 “路由 Slip”:根据从 slip 标头中提取的端点 URI 列表,通过构建的管道路由交换。 |
|
| 创建一个抽样限制,允许您从路由上的流量中提取交换示例。 |
|
| 设置交换的 In 消息的消息正文。 |
|
| 将当前交换的 MEP 设置为指定的值。请参阅 “消息交换模式”一节。 |
|
| 在交换的 In 消息中设置指定的标头。 |
|
| 设置交换的 Out 消息中指定的标头。 |
|
| 设置指定的交换属性。 |
|
| 对 In message body 的内容进行排序(其中可以选择性地指定自定义比较器)。 |
|
| 第 8.4 节 “Splitter”:将当前交换分成一系列交换,每个交换都包含原始消息正文的片段。 |
|
| 停止路由当前交换并将其标记为已完成。 |
|
| 创建一个线程池,用于对路由的后一部分进行并发处理。 |
|
| 第 8.8 节 “Throttler”: 将流率限制为指定级别(每秒交换数)。 |
|
| 抛出指定的 Java 异常. |
|
| 将交换发送到一个或多个端点。请参阅 第 2.1 节 “Pipeline 处理”。 |
| N/A |
使用字符串格式将交换发送到端点。也就是说,端点 URI 字符串可以嵌入 C |
|
| 创建一个 Spring 事务范围,其中包含该路由的后一部分。请参阅 Apache Karaf 事务指南。 |
|
| 第 5.6 节 “消息转换器”: 将 In message 标头复制到 Out message 标头,并将 Out 消息正文设置为指定的值。 |
|
| 使用指定的数据格式将 In 消息正文从低级或二进制格式转换为高级别的格式。 |
|
|
取一个 predicate 表达式来测试当前消息是否有效。如果 predicate 返回 |
|
|
第 12.3 节 “wire Tap”: 使用 |
有些示例处理器
要了解如何在路由中使用处理器,请查看以下示例:
选择
choice ()
处理器是一个条件语句,用于将传入消息路由到备选制作者端点。每个替代制作者端点的前面是一个 when ()
方法,该方法采用 predicate 参数。如果 predicate 为 true,则会选择以下目标,否则处理规则中的下一个 when ()
方法。例如,以下 choice ()
处理器将传入的信息定向到 Target1、Target 2 或 Target3,具体取决于 Predicate1 和 Predicate2 的值:
from("SourceURL") .choice() .when(Predicate1).to("Target1") .when(Predicate2).to("Target2") .otherwise().to("Target3");
在 Spring XML 中,或等效于 Spring XML 中:
<camelContext id="buildSimpleRouteWithChoice" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="SourceURL"/> <choice> <when> <!-- First predicate --> <simple>header.foo = 'bar'</simple> <to uri="Target1"/> </when> <when> <!-- Second predicate --> <simple>header.foo = 'manchu'</simple> <to uri="Target2"/> </when> <otherwise> <to uri="Target3"/> </otherwise> </choice> </route> </camelContext>
在 Java DSL 中,有一个特殊的情形,您可能需要使用 endChoice ()
命令。某些标准 Apache Camel 处理器允许您使用特殊子使用指定额外参数,从而有效地打开额外的嵌套级别,该嵌套通常由 end ()
命令终止。例如,您可以将负载均衡器子指定为 loadBalance ().roundRobin ().to ("mock:foo").to ("mock:bar").end ()
,用于平衡 mock:foo
和 mock:bar
间的消息。但是,如果负载均衡器子嵌入在选择条件中,则需要使用 endChoice ()
命令终止 子句,如下所示:
from("direct:start") .choice() .when(bodyAs(String.class).contains("Camel")) .loadBalance().roundRobin().to("mock:foo").to("mock:bar").endChoice() .otherwise() .to("mock:result");
Filter
可以使用 filter ()
处理器来防止不间断消息到达制作者端点。它采用单个 predicate 参数:如果 predicate 为 true,则允许消息交换到制作者;如果 predicate 为 false,则消息交换会被阻断。例如,以下过滤器会阻断一个消息交换,除非传入的消息包含一个标头 foo
,其值等于 bar
:
from("SourceURL").filter(header("foo").isEqualTo("bar")).to("TargetURL");
在 Spring XML 中,或等效于 Spring XML 中:
<camelContext id="filterRoute" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="SourceURL"/> <filter> <simple>header.foo = 'bar'</simple> <to uri="TargetURL"/> </filter> </route> </camelContext>
Throttler
throttle ()
处理器确保制作者端点不会过载。throttler 的工作原理是限制每秒可传递的消息数量。如果传入的消息超过指定率,则节流者会累积缓冲区中过量消息,并将它们传输速度会更慢于制作者端点。例如,要将吞吐量率限制为每秒 100 个信息,您可以定义以下规则:
from("SourceURL").throttle(100).to("TargetURL");
在 Spring XML 中,或等效于 Spring XML 中:
<camelContext id="throttleRoute" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="SourceURL"/> <throttle maximumRequestsPerPeriod="100" timePeriodMillis="1000"> <to uri="TargetURL"/> </throttle> </route> </camelContext>
自定义处理器
如果此处描述的标准处理器不提供您需要的功能,您总是可以自行定义自定义处理器。要创建自定义处理器,可定义一个实施 org.apache.camel.Processor
接口的类,并覆盖 process ()
方法。以下自定义处理器 MyProcessor
会从传入信息中删除名为 foo
的标头:
例 1.3. 实施自定义处理器类
public class MyProcessor implements org.apache.camel.Processor { public void process(org.apache.camel.Exchange exchange) { inMessage = exchange.getIn(); if (inMessage != null) { inMessage.removeHeader("foo"); } } };
要将自定义处理器插入到路由器规则中,调用 process ()
方法,该方法为将处理器插入到规则的通用机制。例如,以下规则调用在 例 1.3 “实施自定义处理器类” 中定义的处理器:
org.apache.camel.Processor myProc = new MyProcessor(); from("SourceURL").process(myProc).to("TargetURL");