1.5. Processors


概述

要让路由器执行更有趣的事情,而只是将消费者端点连接到制作者端点,您可以将 处理器 添加到您的路由中。处理器是您可以插入到路由规则中的命令,以执行任意处理通过该规则的消息。Apache Camel 提供了各种不同的处理器,如 表 1.1 “Apache Camel Processors” 所示。

表 1.1. Apache Camel Processors
Java DSLXML DSL描述

aggregate()

聚合

第 8.5 节 “聚合器”: 创建一个聚合器,它将多个传入的交换合并到一个交换中。

aop ()

aop

使用 Aspect Oriented programming (AOP)在指定的子路由之前和之后工作。

bean(), beanRef()

Bean

通过调用 Java 对象(或 bean)的方法来处理当前的交换。请参阅 第 2.4 节 “Bean 集成”

choice()

choice

第 8.1 节 “基于内容的路由器”: 使用 whenotherwise 子句,根据交换内容选择特定的子路由。

convertBodyTo()

convertBodyTo

In message body 转换为指定的类型。

delay ()

delay

第 8.9 节 “Delayer”: 延迟交换传播到路由部分的传播。

doTry()

doTry

使用 doCatchdoFinallyend 子句创建用于处理异常的尝试/请求块。

end()

N/A

结束当前命令块。

enrich(),enrichRef()

enrich

第 10.1 节 “内容增强”: 将当前交换与指定 制作者 端点 URI 请求的数据相结合。

filter ()

filter

第 8.2 节 “Message Filter”: 使用 predicate 表达式来过滤传入的交换。

idempotentConsumer()

idempotentConsumer

第 11.8 节 “idempotent Consumer”: 实施一个策略来阻止重复的消息。

inheritErrorHandler()

@inheritErrorHandler

布尔值选项,可用于禁用特定路由节点上的继承错误处理程序(定义为 Java DSL 中的子层,以及 XML DSL 中的属性)。

inOnly()

InOnly

将当前的交换的 MEP 设置为 InOnly (如果没有参数),或者将交换作为 InOnly 发送到指定的端点。

inOut()

inOut

将当前的交换的 MEP 设置为 InOut (如果没有参数),或者将交换作为 InOut 发送到指定的端点。

loadBalance()

loadBalance

第 8.10 节 “Load Balancer”: 通过一系列端点实施负载平衡。

log()

log

将消息记录到控制台。

loop()

loop

第 8.16 节 “loop”: 重复发送每个交换到路由的后一种部分。

markRollbackOnly()

@markRollbackOnly

(事务处理) 仅标记当前用于回滚的事务(不会引发异常)。在 XML DSL 中,这个选项被设置为 rollback 元素的布尔值属性。请参阅 Apache Karaf 交易指南

markRollbackOnlyLast()

@markRollbackOnlyLast

(事务) 如果之前与此线程关联了一个或多个事务,然后暂停,这个命令会标记最新的事务只进行回滚(不会引发异常)。在 XML DSL 中,这个选项被设置为 rollback 元素的布尔值属性。请参阅 Apache Karaf 交易指南

marshal()

marshal

使用指定的数据格式转换为低级或二进制格式,以准备通过特定的传输协议发送。

multicast()

multicast

第 8.13 节 “多播”: 多播当前交换到多个目的地,每个目标都会获得其自身的交换副本。

onCompletion()

onCompletion

定义在主路由完成后执行的子路由(由 Java DSL 中的 end () 结尾)。另请参阅 第 2.14 节 “OnCompletion”

onException()

onException

定义在指定异常发生时执行的子路由(由 Java DSL 中的 end () 结尾)。通常在其自己的行中(不在路由中定义)。

pipeline()

pipeline

第 5.4 节 “管道和过滤器”: 将交换发送到一系列端点,其中一个端点的输出成为下一个端点的输入。另请参阅 第 2.1 节 “Pipeline 处理”

policy()

policy

对当前路由应用策略(目前仅适用于事务策略),请参阅 Apache Karaf 事务指南

pollEnrich(),pollEnrichRef()

pollEnrich

第 10.1 节 “内容增强”: 将当前交换与从指定的 消费者 端点 URI 轮询的数据合并。

process(),processRef

process

在当前交换上执行自定义处理器。请参阅 “自定义处理器”一节第 III 部分 “高级 Camel 编程”

recipientList()

recipientList

第 8.3 节 “接收者列表”: 将交换发送到在运行时计算的接收方列表(例如,基于标头的内容)。

removeHeader()

removeHeader

从交换的 In 消息中删除指定的标头。

removeHeaders ()

removeHeaders

从交换的 In 消息中删除与指定模式匹配的标头。模式可以具有表单 prefix\* HEKETI-rhacmin,在这种情况下,它与以 prefix swig-wagonotherwise 开头的每个名称匹配,它被解释为正则表达式。

removeProperty()

removeProperty

从交换中删除指定的交换属性。

removeProperties()

removeProperties

从交换中删除与指定模式匹配的属性。将以逗号分隔的 1 或更多字符串列表作为参数。第一个字符串是模式(请参阅上面的 removeHeaders () )。后续字符串指定例外 - 这些属性保留。

resequence()

resequence

第 8.6 节 “Resequencer”: 根据指定的 comparotor 操作对传入的交换进行排序。支持 批处理模式流模式

rollback()

rollback

(事务处理) 仅标记当前用于回滚的事务(默认为异常)。请参阅 Apache Karaf 交易指南

routingSlip()

routingSlip

第 8.7 节 “路由片段”: 根据从 slip 标头中提取的端点 URI 列表,通过动态构建的管道路由交换。

sample()

示例

创建抽样节流,允许您从路由上的流量提取交换示例。

setBody()

setBody

设置交换的 In 消息的消息正文。

setExchangePattern()

setExchangePattern

将当前的交换的 MEP 设置为指定的值。请参阅 “消息交换模式”一节

setHeader()

setHeader

在交换的 In 消息中设置指定的标头。

setOutHeader()

setOutHeader

在交换的 Out 消息中设置指定的标头。

setProperty()

setProperty()

设置指定的交换属性。

sort()

排序

In 消息正文的内容排序(可以选择性地指定自定义比较器的位置)。

split()

split

第 8.4 节 “Splitter”: 将当前交换分成一系列交换,每个分割交换都包含原始消息正文的片段。

stop()

stop

停止路由当前交换并将其标记为 completed。

threads ()

threads

创建一个线程池,用于并发处理路由部分。

throttle()

throttle

第 8.8 节 “Throttler”: 将流率限制为指定的级别(每秒交换数)。

throwException()

throwException

抛出指定的 Java 异常。

to ()

将交换发送到一个或多个端点。请参阅 第 2.1 节 “Pipeline 处理”

toF()

N/A

使用字符串格式化将交换发送到端点。也就是说,端点 URI 字符串可以嵌入 C printf () 函数样式中的替换。

transacted()

Transacted

创建一个 Spring 事务范围,其中包含路由的后者部分。请参阅 Apache Karaf 交易指南

transform()

转换

第 5.6 节 “消息 Translator”: 将 In message 标头复制到 Out message 标头,并将 Out message body 设置为指定的值。

unmarshal()

unmarshal

使用指定的数据格式,将 In 消息正文从低级或二进制格式转换为高级别的格式。

validate ()

validate

取一个 predicate 表达式来测试当前消息是否有效。如果 predicate 返回 false,则抛出 PredicateValidationException 异常。

wireTap()

wireTap

第 12.3 节 “wire Tap”: 使用 ExchangePattern.InOnly MEP 将当前交换的副本发送到指定的 wire tap URI。

一些处理器示例

要了解如何在路由中使用处理器,请参阅以下示例:

choice

choice () 处理器是一个条件语句,用于将传入消息路由到替代制作者端点。每个替代制作者端点都以 when () 方法开头,此方法采用 predicate 参数。如果 predicate 为 true,则会选择以下目标,否则处理将继续执行规则中的下一个 when () 方法。例如,以下 choice () 处理器将传入消息定向到 Target1、 Target2Target3,具体取决于 Predicate1Predicate2 的值:

from("SourceURL")
    .choice()
        .when(Predicate1).to("Target1")
        .when(Predicate2).to("Target2")
        .otherwise().to("Target3");

或者等效在 Spring XML 中:

<camelContext id="buildSimpleRouteWithChoice" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="SourceURL"/>
    <choice>
      <when>
        <!-- First predicate -->
        <simple>header.foo = 'bar'</simple>
        <to uri="Target1"/>
      </when>
      <when>
        <!-- Second predicate -->
        <simple>header.foo = 'manchu'</simple>
        <to uri="Target2"/>
      </when>
      <otherwise>
        <to uri="Target3"/>
      </otherwise>
    </choice>
  </route>
</camelContext>

在 Java DSL 中,有一个特殊情况,您可能需要使用 endChoice () 命令。某些标准的 Apache Camel 处理器允许您使用特殊的子层来指定额外的参数,从而有效地打开额外的嵌套级别,这通常由 end () 命令终止。例如,您可以将负载均衡器子句指定为 loadBalance ().roundRobin ().to (" mock:foo ").to ("mock:bar").end (),它会在 mock:foo 和 mock:bar 端点之间平衡消息。如果负载均衡器子句嵌入在选择条件中,需要使用 endChoice () 命令来终止子句,如下所示:

from("direct:start")
    .choice()
        .when(bodyAs(String.class).contains("Camel"))
            .loadBalance().roundRobin().to("mock:foo").to("mock:bar").endChoice()
        .otherwise()
            .to("mock:result");

Filter

filter () 处理器可用于防止不常见的消息到达制作者端点。它接受单个 predicate 参数:如果 predicate 为 true,则允许消息交换到制作者;如果 predicate 为 false,则消息交换会被阻断。例如,以下过滤器会阻止消息交换,除非传入消息包含一个标题 foo,值为 bar:

from("SourceURL").filter(header("foo").isEqualTo("bar")).to("TargetURL");

或者等效在 Spring XML 中:

<camelContext id="filterRoute" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="SourceURL"/>
    <filter>
      <simple>header.foo = 'bar'</simple>
      <to uri="TargetURL"/>
    </filter>
  </route>
</camelContext>

Throttler

throttle () 处理器确保制作者端点不会超载。throttler 的工作原理为:限制每秒可以传递的消息数量。如果传入的消息超过指定的速率,throttler 会在缓冲区中累积超出消息,并将其传送到制作者端点。例如,要将吞吐量限制为每秒 100 个信息,您可以定义以下规则:

from("SourceURL").throttle(100).to("TargetURL");

或者等效在 Spring XML 中:

<camelContext id="throttleRoute" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="SourceURL"/>
    <throttle maximumRequestsPerPeriod="100" timePeriodMillis="1000">
      <to uri="TargetURL"/>
    </throttle>
  </route>
</camelContext>

自定义处理器

如果这里描述的标准处理器都没有提供您需要的功能,则始终可以定义自己的自定义处理器。要创建自定义处理器,请定义一个实施 org.apache.camel.Processor 接口的类,并覆盖 process () 方法。以下自定义处理器 MyProcessor 会从传入消息中删除名为 foo 的标头:

例 1.3. 实施自定义处理器类

public class MyProcessor implements org.apache.camel.Processor {
public void process(org.apache.camel.Exchange exchange) {
  inMessage = exchange.getIn();
  if (inMessage != null) {
      inMessage.removeHeader("foo");
  }
}
};

要将自定义处理器插入到路由器规则中,调用 process () 方法,该方法为将处理器插入规则提供了通用机制。例如,以下规则调用 例 1.3 “实施自定义处理器类” 中定义的处理器:

org.apache.camel.Processor myProc = new MyProcessor();

from("SourceURL").process(myProc).to("TargetURL");
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.