搜索

8.4. Splitter

download PDF

概述

splitter 是将传入消息分成一系列传出消息的路由器类型。每个传出消息都包含一个原始消息。在 Apache Camel 中,图 8.4 “Splitter Pattern” 中显示的 splitter 模式由 split () Java DSL 命令实现。

图 8.4. Splitter Pattern

Splitter 模式

Apache Camel 分割程序实际上支持两种模式,如下所示:

  • 简单的 splitter HEKETI-wagon 实施其自身的 splitter 模式。
  • Splitter/aggregator 5-4-wagoncom 使用聚合器模式来组合分割器模式,以便在消息被处理后被重新发送。

在将原始消息分成多个部分之前,它会制作原始消息的重复副本。在 Show 副本中,原始消息的标头和有效负载将复制为仅引用。虽然分割程序本身不会将生成的消息部分路由到不同的端点,但分割消息的部分可能会处于二级路由中。

由于消息部分是应该有的副本,它们仍与原始消息相关联。因此,无法单独修改它们。如果要在将自定义逻辑路由到一组端点之前将自定义逻辑应用到消息部分的不同副本,您必须在 splitter 子句中使用 onPrepareRef DSL 选项来进行原始消息的深度副本。有关使用选项的详情,请参考 “选项”一节

Java DSL 示例

以下示例定义了从 seda:aseda:b 的路由,该路由通过将传入消息的每一行转换为单独的传出消息来分割消息:

RouteBuilder builder = new RouteBuilder() {
    public void configure() {
        from("seda:a")
          .split(bodyAs(String.class).tokenize("\n"))
          .to("seda:b");
    }
};

splitter 可以使用任何表达式语言,因此您可以使用任何支持的脚本语言(如 XPath、XQuery 或 SQL)分割信息(请参阅 第 II 部分 “路由表达式和 predicates 语言”)。以下示例从传入消息中提取 bar 元素,并将它们插入到单独的传出消息中:

from("activemq:my.queue")
  .split(xpath("//foo/bar"))
  .to("file://some/directory")

XML 配置示例

以下示例演示了如何使用 XPath 脚本语言在 XML 中配置 splitter 路由:

<camelContext id="buildSplitter" xmlns="http://camel.apache.org/schema/spring">
    <route>
      <from uri="seda:a"/>
      <split>
        <xpath>//foo/bar</xpath>
        <to uri="seda:b"/>
      </split>
    </route>
</camelContext>

您可以使用 XML DSL 中的令牌表达式来通过令牌分割正文或标头,其中使用 tokenize 元素定义令牌表达式。在以下示例中,消息正文使用 \n 分隔符字符进行令牌。要使用正则表达式模式,请在 tokenize 项中设置 regex=true

<camelContext xmlns="http://camel.apache.org/schema/spring">
    <route>
        <from uri="direct:start"/>
        <split>
            <tokenize token="\n"/>
            <to uri="mock:result"/>
        </split>
    </route>
    </camelContext>

分成行组

要将大型文件分成 1000 行的块,您可以定义一个分割路由,如 Java DSL 所示:

from("file:inbox")
    .split().tokenize("\n", 1000).streaming()
       .to("activemq:queue:order");

令牌 化的 第二个参数指定应分组为单个块的行数。streaming () 子句指示分割者不会一次读取整个文件(如果文件较大),从而使整个文件性能显著提高。

可以在 XML DSL 中定义相同的路由,如下所示:

<route>
  <from uri="file:inbox"/>
  <split streaming="true">
    <tokenize token="\n" group="1000"/>
    <to uri="activemq:queue:order"/>
  </split>
</route>

使用 group 选项时的输出始终是 java.lang.String 类型。

跳过第一项

要跳过消息中的第一个项目,您可以使用 skipFirst 选项。

在 Java DSL 中,在 tokenize 参数 true 中进行第三个选项:

from("direct:start")
 // split by new line and group by 3, and skip the very first element
      .split().tokenize("\n", 3, true).streaming()
         .to("mock:group");

可以在 XML DSL 中定义相同的路由,如下所示:

<route>
  <from uri="file:inbox"/>
    <split streaming="true">
    <tokenize token="\n" group="1000" skipFirst="true" />
    <to uri="activemq:queue:order"/>
  </split>
</route>

Splitter reply

如果进入 splitter 的交换具有 InOut message-exchange 模式(即预期回复),则分割器会返回原始输入消息的副本,作为 Out 消息插槽中的回复消息。您可以通过实施自己的 聚合策略来覆盖此默认行为

并行执行

如果要并行执行结果消息,您可以启用 parallel processing 选项,该选项可实例化线程池来处理消息片段。例如:

XPathBuilder xPathBuilder = new XPathBuilder("//foo/bar");
from("activemq:my.queue").split(xPathBuilder).parallelProcessing().to("activemq:my.parts");

您可以自定义并行分割中使用的底层 ThreadPoolExecutor。例如,您可以在 Java DSL 中指定自定义 executor,如下所示:

XPathBuilder xPathBuilder = new XPathBuilder("//foo/bar");
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8, 16, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
from("activemq:my.queue")
  .split(xPathBuilder)
  .parallelProcessing()
  .executorService(threadPoolExecutor)
  .to("activemq:my.parts");

您可以在 XML DSL 中指定自定义 executor,如下所示:

<camelContext xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="direct:parallel-custom-pool"/>
    <split executorServiceRef="threadPoolExecutor">
      <xpath>/invoice/lineItems</xpath>
      <to uri="mock:result"/>
    </split>
  </route>
</camelContext>

<bean id="threadPoolExecutor" class="java.util.concurrent.ThreadPoolExecutor">
  <constructor-arg index="0" value="8"/>
  <constructor-arg index="1" value="16"/>
  <constructor-arg index="2" value="0"/>
  <constructor-arg index="3" value="MILLISECONDS"/>
  <constructor-arg index="4"><bean class="java.util.concurrent.LinkedBlockingQueue"/></constructor-arg>
</bean>

使用 bean 执行分割

由于 splitter 可以使用任何 表达式来执行拆分,因此您可以通过调用 method () 表达式来使用 bean 执行分割。bean 应该返回可取的值,例如: java.util.Collectionjava.util.Iterator 或数组。

以下路由定义了对 mySplitterBean bean 实例调用方法的 method () 表达式:

from("direct:body")
        // here we use a POJO bean mySplitterBean to do the split of the payload
        .split()
        .method("mySplitterBean", "splitBody")
        .to("mock:result");
from("direct:message")
        // here we use a POJO bean mySplitterBean to do the split of the message
        // with a certain header value
        .split()
        .method("mySplitterBean", "splitMessage")
        .to("mock:result");

其中 mySplitterBeanMySplitterBean 类的实例,它定义如下:

public class MySplitterBean {

    /**
     * The split body method returns something that is iteratable such as a java.util.List.
     *
     * @param body the payload of the incoming message
     * @return a list containing each part split
     */
    public List<String> splitBody(String body) {
        // since this is based on an unit test you can of couse
        // use different logic for splitting as {router} have out
        // of the box support for splitting a String based on comma
        // but this is for show and tell, since this is java code
        // you have the full power how you like to split your messages
        List<String> answer = new ArrayList<String>();
        String[] parts = body.split(",");
        for (String part : parts) {
            answer.add(part);
        }
        return answer;
    }

    /**
     * The split message method returns something that is iteratable such as a java.util.List.
     *
     * @param header the header of the incoming message with the name user
     * @param body the payload of the incoming message
     * @return a list containing each part split
     */
    public List<Message> splitMessage(@Header(value = "user") String header, @Body String body) {
        // we can leverage the Parameter Binding Annotations
        // http://camel.apache.org/parameter-binding-annotations.html
        // to access the message header and body at same time,
        // then create the message that we want, splitter will
        // take care rest of them.
        // *NOTE* this feature requires {router} version >= 1.6.1
        List<Message> answer = new ArrayList<Message>();
        String[] parts = header.split(",");
        for (String part : parts) {
            DefaultMessage message = new DefaultMessage();
            message.setHeader("user", part);
            message.setBody(body);
            answer.add(message);
        }
        return answer;
    }
}

您可以使用带有 Splitter EIP 的BeanIOSplitter 对象来分割大型有效负载,以避免将整个内容读到内存中。以下示例演示了如何使用映射文件设置 BeanIOSplitter 对象,该文件从 classpath 加载:

注意

BeanIOSplitter 类在 Camel 2.18 中是新的。Camel 2.17 不提供它。

BeanIOSplitter splitter = new BeanIOSplitter();
   splitter.setMapping("org/apache/camel/dataformat/beanio/mappings.xml");
   splitter.setStreamName("employeeFile");

    // Following is a route that uses the beanio data format to format CSV data
    // in Java objects:
    from("direct:unmarshal")
        // Here the message body is split to obtain a message for each row:
         .split(splitter).streaming()
         .to("log:line")
         .to("mock:beanio-unmarshal");

以下示例添加了一个错误处理程序:

BeanIOSplitter splitter = new BeanIOSplitter();
   splitter.setMapping("org/apache/camel/dataformat/beanio/mappings.xml");
   splitter.setStreamName("employeeFile");
   splitter.setBeanReaderErrorHandlerType(MyErrorHandler.class);
   from("direct:unmarshal")
      .split(splitter).streaming()
      .to("log:line")
      .to("mock:beanio-unmarshal");

Exchange 属性

每个分割交换都设置了以下属性:

headertypedescription

CamelSplitIndex

int

Apache Camel 2.0:每个被分割的交换增加的分割计数器。计数器从 0 开始。

CamelSplitSize

int

Apache Camel 2.0:分割交换的总数。此标头不适用于基于流的分割。

CamelSplitComplete

布尔值

Apache Camel 2.4:此交换是否是最后。

Splitter/aggregator 模式

在处理各个部分完成后,消息组件要聚合到单一交换中是一种常见的模式。要支持此模式,可以使用 split () DSL 命令提供 AggregationStrategy 对象作为第二个参数。

Java DSL 示例

以下示例演示了如何使用自定义聚合策略在处理所有消息片段后重新发送分割消息:

from("direct:start")
    .split(body().tokenize("@"), new MyOrderStrategy())
        // each split message is then send to this bean where we can process it
        .to("bean:MyOrderService?method=handleOrder")
        // this is important to end the splitter route as we do not want to do more routing
        // on each split message
    .end()
    // after we have split and handled each message we want to send a single combined
    // response back to the original caller, so we let this bean build it for us
    // this bean will receive the result of the aggregate strategy: MyOrderStrategy
    .to("bean:MyOrderService?method=buildCombinedResponse")

AggregationStrategy 实现

上述路由中使用的自定义聚合策略 MyOrderStrategy 如下:

/**
 * This is our own order aggregation strategy where we can control
 * how each split message should be combined. As we do not want to
 * lose any message, we copy from the new to the old to preserve the
 * order lines as long we process them
 */
public static class MyOrderStrategy implements AggregationStrategy {

    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        // put order together in old exchange by adding the order from new exchange

        if (oldExchange == null) {
            // the first time we aggregate we only have the new exchange,
            // so we just return it
            return newExchange;
        }

        String orders = oldExchange.getIn().getBody(String.class);
        String newLine = newExchange.getIn().getBody(String.class);

        LOG.debug("Aggregate old orders: " + orders);
        LOG.debug("Aggregate new order: " + newLine);

        // put orders together separating by semi colon
        orders = orders + ";" + newLine;
        // put combined order back on old to preserve it
        oldExchange.getIn().setBody(orders);

        // return old as this is the one that has all the orders gathered until now
        return oldExchange;
    }
}

基于流的处理

启用并行处理后,理论上有可能在较早出现之前发生的情况之前,后续消息可能准备好进行聚合。换句话说,消息部分可能会到达聚合器没有顺序。默认情况下,这不会发生,因为分割实施将消息片段重新整理回其原始顺序,然后再将它们传递给聚合器。

如果您希望在消息就绪时聚合消息片段(可能按顺序),您可以启用 streaming 选项,如下所示:

from("direct:streaming")
  .split(body().tokenize(","), new MyOrderStrategy())
    .parallelProcessing()
    .streaming()
    .to("activemq:my.parts")
  .end()
  .to("activemq:all.parts");

您还可以提供用于流的自定义迭代程序,如下所示:

// Java
import static org.apache.camel.builder.ExpressionBuilder.beanExpression;
...
from("direct:streaming")
     .split(beanExpression(new MyCustomIteratorFactory(),  "iterator"))
     .streaming().to("activemq:my.parts")
流和 XPath

您不能与 XPath 结合使用流模式。XPath 需要在内存中的完整 DOM XML 文档。

使用 XML 进行基于流的处理

如果传入的消息是非常大的 XML 文件,您可以在流模式中使用 tokenizeXML 子命令来处理消息。

例如,如果一个含有一系列 order 元素的大型 XML 文件,您可以使用类似如下的路由将文件分成 顺序 元素:

from("file:inbox")
  .split().tokenizeXML("order").streaming()
  .to("activemq:queue:order");

您可以通过定义类似如下的路由在 XML 中执行同样的操作:

<route>
  <from uri="file:inbox"/>
  <split streaming="true">
    <tokenize token="order" xml="true"/>
    <to uri="activemq:queue:order"/>
  </split>
</route>

通常,您需要访问在令牌元素的 enclosing (ancestor)元素中定义的命名空间。您可以通过指定您要从中继承命名空间定义的元素,将命名空间定义从上级元素复制到 token 元素中。

在 Java DSL 中,您将 ancestor 项指定为 tokenizeXML 的第二个参数。例如,要继承来自 enclosing orders 元素的命名空间定义:

from("file:inbox")
  .split().tokenizeXML("order", "orders").streaming()
  .to("activemq:queue:order");

在 XML DSL 中,您可以使用 inheritNamespaceTagName 属性指定 ancestor 元素。例如:

<route>
  <from uri="file:inbox"/>
  <split streaming="true">
    <tokenize token="order"
              xml="true"
              inheritNamespaceTagName="orders"/>
    <to uri="activemq:queue:order"/>
  </split>
</route>

选项

split DSL 命令支持以下选项:

Name

默认值

描述

strategyRef

 

是指用于将子消息中的回复汇编为来自 第 8.4 节 “Splitter” 的单个传出消息的 AggregationStrategy。有关默认使用的内容 ,请参阅标题为以下的 splitter 返回的内容

strategyMethodName

 

当将 POJO 用作 AggregationStrategy 时,可以使用此选项明确指定要使用的方法名称。

strategyMethodAllowNull

false

当将 POJO 用作 AggregationStrategy 时,可以使用此选项。如果为 false 则不会使用聚合方法,当没有数据增强时。如果为 true,则空值用于 oldExchange,如果没有数据更丰富,则使用null 值。

parallelProcessing

false

如果启用,则同时处理子消息。请注意,调用者线程仍然会等待所有子消息都完全处理,然后再继续。

parallelAggregate

false

如果启用,则 AggregationStrategy 上的聚合方法可同时调用。请注意,这需要实施 AggregationStrategy 才能实现 thread-safe。默认情况下,此选项为 false,这意味着 Camel 会自动同步聚合方法 的调用。但是,在某些用例中,您可以通过将 AggregationStrategy 设为 thread-safe 来提高性能,并将此选项设置为 true

executorServiceRef

 

指的是用于并行处理的自定义线程池。请注意,如果您设置了这个选项,则并行处理是自动的,您不必启用该选项。

stopOnException

false

Camel 2.2 : 在出现异常情况时,是否立即停止继续处理。如果禁用,则 Camel 继续分割并处理子消息,无论它们是否失败。您可以在 AggregationStrategy 类中处理异常,您可以完全控制如何处理它。

streaming

false

如果启用,Camel 将以流方式分割,这意味着它将以块的形式分割输入信息。这可减少内存开销。例如,如果您分割大型消息,则建议启用流。如果启用了流,则子消息回复将按其顺序聚合,例如按其返回的顺序进行聚合。如果禁用,Camel 将以与分割方式相同的顺序处理子消息回复。

timeout

 

Camel 2.5: 设置在 millis 中指定的总超时。如果 第 8.3 节 “接收者列表” 无法分割和处理给定时间段内的所有回复,则超时触发器和 第 8.4 节 “Splitter” 中断并继续。请注意,如果您提供了一个 AggregationStrategy,则在中断前调用 timeout 方法。

onPrepareRef

 

Camel 2.8: 在处理前,请参阅自定义处理器准备交换的子消息。这可让您执行任何自定义逻辑,如在需要时深度获取消息有效负载。

shareUnitOfWork

false

Camel 2.8: 是否应共享工作单元。详情请查看以下。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.