1.5. 处理器


概述

为了使路由器能够比简单地将消费者端点连接到生成者端点来执行更有趣的操作,您可以在路由中添加 处理器。处理器是一个命令,您可以插入到路由规则中,以执行通过该规则流的任意消息。Apache Camel 提供各种不同的处理器,如 表 1.1 “Apache Camel Processors” 所示。

表 1.1. Apache Camel Processors
Java DSLXML DSL描述

aggregate()

聚合

第 8.5 节 “聚合器”: 创建一个聚合器,它将多个传入交换合并为一个交换。

aop()

aop

使用 Aspect Oriented programming (AOP)在指定的子路由之前和之后工作。

bean(), beanRef()

Bean

通过调用 Java 对象(或 bean)的方法来处理当前的交换。请参阅 第 2.4 节 “Bean 集成”

choice()

选择

第 8.1 节 “基于内容的路由器”: 使用 whenotherwise 子句,根据交换内容选择特定的子路由。

convertBodyTo()

convertBodyTo

In 消息正文转换为指定类型。

delay()

delay

第 8.9 节 “Delayer”: 将交换传播到路由的后部分。

doTry()

doTry

创建一个用于处理异常的 try/catch 块,使用 doCatchdoFinallyend 子句。

end()

不适用

结束当前命令块。

enrich(),enrichRef()

增强

第 10.1 节 “内容增强”: 将当前交换与从指定 制作者 端点 URI 请求的数据合并。

filter()

filter

第 8.2 节 “消息过滤器”: 使用 predicate 表达式来过滤传入的交换。

idempotentConsumer()

idempotentConsumer

第 11.8 节 “idempotent Consumer”: 实施策略以抑制重复消息。

inheritErrorHandler()

@inheritErrorHandler

布尔值选项,可用于禁用特定路由节点上继承的错误处理程序(定义为 Java DSL 中的子clause,以及 XML DSL 中的属性)。

inOnly()

inOnly

将当前交换的 MEP 设置为 InOnly (如果没有参数),或者将交换作为 InOnly 发送到指定的端点。

inOut()

inOut

将当前交换的 MEP 设置为 InOut (如果没有参数),或者将交换作为 InOut 发送到指定的端点。

loadBalance()

loadBalance

第 8.10 节 “Load Balancer”: 通过一组端点实施负载平衡。

log()

log

将消息记录到控制台。

loop()

loop

第 8.16 节 “loop”: 重复将每个交换重新发送到路由的后部分。

markRollbackOnly()

@markRollbackOnly

(事务处理) 仅标记当前回滚的事务(不会引发异常)。在 XML DSL 中,此选项被设置为 rollback 元素上的布尔值属性。请参阅 Apache Karaf 事务指南

markRollbackOnlyLast()

@markRollbackOnlyLast

(事务处理) 如果之前与此线程关联了一个或多个事务,这个命令会暂停,这个命令会标记最新的回滚事务(不会引发异常)。在 XML DSL 中,此选项被设置为 rollback 元素上的布尔值属性。请参阅 Apache Karaf 事务指南

marshal()

marshal

使用指定的数据格式转换为低级或二进制格式,以准备通过特定的传输协议发送。

multicast()

multicast

第 8.13 节 “多播”: 将当前交换广播到多个目的地,每个目的地获得自己的交换副本。

onCompletion()

onCompletion

定义在主路由完成后执行的子路由(由 Java DSL 中的 end () 结尾)。另请参阅 第 2.14 节 “OnCompletion”

onException()

onException

定义在发生指定异常时执行的子路由(由 Java DSL 中的 end () 结尾)。通常在其自身的行中(而非路由中)定义。

pipeline()

pipeline

第 5.4 节 “管道和过滤器”: 将交换发送到一系列端点,其中一个端点的输出成为下一个端点的输入。另请参阅 第 2.1 节 “Pipeline 处理”

policy()

policy

将策略应用到当前路由(目前仅用于事务策略)see Apache Karaf 事务指南

pollEnrich(),pollEnrichRef()

pollEnrich

第 10.1 节 “内容增强”: 将当前交换与从指定 消费者 端点 URI 轮询的数据合并。

process(),processRef

process

在当前交换上执行自定义处理器。请参阅 “自定义处理器”一节第 III 部分 “高级 Camel 编程”

recipientList()

recipientList

第 8.3 节 “接收者列表”: 将交换发送到在运行时计算的接收方列表(例如,基于标头的内容)。

removeHeader()

removeHeader

从交换的 In 消息中删除指定的标头。

removeHeaders()

removeHeaders

从交换的 In 消息中删除与指定模式匹配的标头。模式可以有形式,prefix\* mvapich-wagonin,在这种情况下,它与以 prefixProductShortName-setuptoolsotherwise 开头的每个名称匹配,它将解释为正则表达式。

removeProperty()

removeProperty

从交换中删除指定的交换属性。

removeProperties()

removeProperties

从交换中删除与指定模式匹配的属性。将以逗号分隔的 1 字符串列表作为参数。第一个字符串是模式(请参阅上面的 removeHeaders () )。后续字符串指定例外 - 这些属性保留。

resequence()

重新排序

第 8.6 节 “Resequencer”: 根据指定比较器操作对传入的交换重新排序。支持 批处理模式流模式

rollback()

rollback

(事务处理) 仅标记当前回滚的事务(在默认情况下也会增加一个例外)。请参阅 Apache Karaf 事务指南

routingSlip()

routingSlip

第 8.7 节 “路由 Slip”: 根据从 slip 标头中提取的端点 URI 列表,通过管道动态地路由交换。

sample()

示例

创建一个抽样节流,允许您从路由上的流量提取交换示例。

setBody()

setBody

设置交换的 In 消息的消息正文。

setExchangePattern()

setExchangePattern

将当前交换的 MEP 设置为指定的值。请参阅 “消息交换模式”一节

setHeader()

setHeader

在交换的 In 消息中设置指定的标头。

setOutHeader()

setOutHeader

在交换的 Out 消息中设置指定的标头。

setProperty()

setProperty()

设置指定的交换属性。

sort()

sort

In 消息正文的内容进行排序(可以指定自定义比较器)。

split()

split

第 8.4 节 “Splitter”: 将当前交换分成一系列交换,其中每个分割交换包含原始消息正文的片段。

stop()

stop

停止路由当前交换并将其标记为 completed。

threads()

threads

创建一个线程池来并发处理路由的后部分。

throttle()

throttle

第 8.8 节 “Throttler”: 将流率限制为指定的级别(每秒交换/秒)。

throwException()

throwException

引发指定的 Java 异常。

to()

将交换发送到一个或多个端点。请参阅 第 2.1 节 “Pipeline 处理”

toF()

不适用

使用字符串格式将交换发送到端点。也就是说,端点 URI 字符串可以将替换嵌入到 C printf () 函数的样式中。

transacted()

transacted

创建一个 Spring 事务范围,它将包括路由的后部分。请参阅 Apache Karaf 事务指南

transform()

transform

第 5.6 节 “message Translator”: 将 In 消息标头复制到 Out 消息标头,并将 Out 消息正文设置为指定的值。

unmarshal()

unmarshal

使用指定的数据格式,将 In 消息正文从低级或二进制格式转换为高级别格式。

validate()

validate

使用 predicate 表达式来测试当前消息是否有效。如果 predicate 返回 false,则抛出 PredicateValidationException 异常。

wireTap()

wireTap

第 12.3 节 “wire Tap”: 使用 ExchangePattern.InOnly MEP,将当前交换的副本发送到指定的 wire tap URI。

一些处理器示例

要了解如何在路由中使用处理器的一些想法,请参见以下示例:

选择

choice () 处理器是一个条件语句,用于将传入的消息路由到替代制作者端点。每个替代制作者端点都以 when () 方法开头,该方法采用 predicate 参数。如果 predicate 为 true,则会选择以下目标,否则处理会进入规则中的下一个 when () 方法。例如,以下 choice () 处理器将传入的消息定向到 Target1、 Target2Target3,具体取决于 Predicate1Predicate2 的值:

from("SourceURL")
    .choice()
        .when(Predicate1).to("Target1")
        .when(Predicate2).to("Target2")
        .otherwise().to("Target3");

或者等同于 Spring XML:

<camelContext id="buildSimpleRouteWithChoice" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="SourceURL"/>
    <choice>
      <when>
        <!-- First predicate -->
        <simple>header.foo = 'bar'</simple>
        <to uri="Target1"/>
      </when>
      <when>
        <!-- Second predicate -->
        <simple>header.foo = 'manchu'</simple>
        <to uri="Target2"/>
      </when>
      <otherwise>
        <to uri="Target3"/>
      </otherwise>
    </choice>
  </route>
</camelContext>

在 Java DSL 中,您可能需要使用 endChoice () 命令。某些标准 Apache Camel 处理器允许您使用特殊的子库指定额外的参数,从而有效地打开额外的嵌套级别,该级别通常由 end () 命令终止。例如,您可以将负载均衡器条款指定为 loadBalance ().roundRobin ().to (" mock:foo ").to ("mock:bar").end (),它会在 mock:foo 和 mock:bar 端点之间负载均衡消息。但是,如果负载均衡器句嵌入到选择条件中,则需要使用 endChoice () 命令终止 子句,如下所示:

from("direct:start")
    .choice()
        .when(bodyAs(String.class).contains("Camel"))
            .loadBalance().roundRobin().to("mock:foo").to("mock:bar").endChoice()
        .otherwise()
            .to("mock:result");

Filter

filter () 处理器可用于防止未中断消息到达制作者端点。它采用单个 predicate 参数:如果 predicate 为 true,则允许消息交换到制作者;如果 predicate 为 false,则消息交换会被阻止。例如,以下过滤器会阻止消息交换,除非传入的消息包含标头 foo,其值等于 bar

from("SourceURL").filter(header("foo").isEqualTo("bar")).to("TargetURL");

或者等同于 Spring XML:

<camelContext id="filterRoute" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="SourceURL"/>
    <filter>
      <simple>header.foo = 'bar'</simple>
      <to uri="TargetURL"/>
    </filter>
  </route>
</camelContext>

Throttler

throttle () 处理器确保生成者端点不会超载。throttler 的工作原理是通过限制每秒可传递的消息数量。如果传入的消息超过指定率,throttler 会在缓冲区中累积过量消息,并将其传输成生成者端点的速度。例如,要将吞吐量率限制为每秒 100 个消息,您可以定义以下规则:

from("SourceURL").throttle(100).to("TargetURL");

或者等同于 Spring XML:

<camelContext id="throttleRoute" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="SourceURL"/>
    <throttle maximumRequestsPerPeriod="100" timePeriodMillis="1000">
      <to uri="TargetURL"/>
    </throttle>
  </route>
</camelContext>

自定义处理器

如果此处未描述的标准处理器都提供您需要的功能,则始终可以自行定义自定义处理器。要创建自定义处理器,请定义一个实施 org.apache.camel.Processor 接口的类,并覆盖 process () 方法。以下自定义处理器 MyProcessor 会从传入的消息中删除名为 foo 的标头:

例 1.3. 实施自定义处理器类

public class MyProcessor implements org.apache.camel.Processor {
public void process(org.apache.camel.Exchange exchange) {
  inMessage = exchange.getIn();
  if (inMessage != null) {
      inMessage.removeHeader("foo");
  }
}
};

要将自定义处理器插入到路由器规则中,可调用 process () 方法,此方法提供了将处理器插入到规则的通用机制。例如,以下规则调用 例 1.3 “实施自定义处理器类” 中定义的处理器:

org.apache.camel.Processor myProc = new MyProcessor();

from("SourceURL").process(myProc).to("TargetURL");
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.