第 10 章 message Transformation


摘要

消息转换模式描述了如何修改消息的内容,以满足各种用途。

10.1. 内容增强

概述

内容增强 模式描述了一种场景,其中消息目的地需要比原始消息中存在的数据更多。在这种情况下,您将使用消息转换器、路由逻辑中的任意处理器,或者内容增强方法从外部资源拉取额外的数据。

图 10.1. 内容增强模式

内容增强模式

增强内容的替代方案

Apache Camel 支持多种增强内容:

  • 在路由逻辑中带有任意处理器的消息转换器
  • enrich () 方法通过将当前交换的副本发送到 制作者 端点,然后使用结果回复中的数据,从资源中获取其他数据。增强者创建的交换始终是一个 InOut 交换。
  • pollEnrich () 方法通过轮询 消费者 端点来获取数据。有效地是 pollEnrich () 操作中来自 main 路由和消费者端点的消费者端点。也就是说,路由中初始消费者上的传入消息会触发轮询消费者的 pollEnrich () 方法。
注意

enrich ()pollEnrich () 方法支持动态端点 URI。您可以通过指定一个表达式来计算 URI,以便您从当前交换获取值。例如,您可以使用从数据交换计算的名称来轮询文件。Camel 2.16 中引入了此行为。这个变化会破坏 XML DSL,并可让您轻松地迁移。Java DSL 保持向后兼容。

使用消息转换器和处理器功能丰富的内容

Camel 为使用类型安全 IDE 友好的方式创建路由和调解规则提供 流畅的 构建程序,提供智能完成并正在重构。当您测试分布式系统时,需要存存特定外部系统,以便您可以在特定系统可用或编写之前测试其他系统部分。执行此操作的一种方法是使用某种 模板 系统通过生成具有主要静态正文的动态消息来生成对请求的响应。使用模板的另一种方式是使用来自一个目的地的消息,将其转换为 VelocityXQuery,然后将其发送到另一个目的地。以下示例显示了一个 InOnly (单向)消息:

from("activemq:My.Queue").
  to("velocity:com/acme/MyResponse.vm").
  to("activemq:Another.Queue");

假设您想要使用 InOut (request-reply)消息传递来处理 ActiveMQ 上的 My.Queue 队列上的请求。您希望模板生成的响应进入 JMSReplyTo 目的地。以下示例演示了如何进行此操作:

from("activemq:My.Queue").
  to("velocity:com/acme/MyResponse.vm");

以下简单示例演示了如何使用 DSL 转换消息正文:

from("direct:start").setBody(body().append(" World!")).to("mock:result");

以下示例使用显式 Java 代码来添加处理器:

from("direct:start").process(new Processor() {
    public void process(Exchange exchange) {
        Message in = exchange.getIn();
        in.setBody(in.getBody(String.class) + " World!");
    }
}).to("mock:result");

下一个示例使用 bean 集成来启用使用任何 bean 作为转换器:

from("activemq:My.Queue").
  beanRef("myBeanName", "myMethodName").
  to("activemq:Another.Queue");

以下示例显示了 Spring XML 实施:

<route>
  <from uri="activemq:Input"/>
  <bean ref="myBeanName" method="doTransform"/>
  <to uri="activemq:Output"/>
</route>/>

使用 enrich ()方法增强内容

AggregationStrategy aggregationStrategy = ...

from("direct:start")
  .enrich("direct:resource", aggregationStrategy)
  .to("direct:result");

from("direct:resource")
...

内容增强(丰富的 ) 从资源端点 检索其他数据,以便增强传入消息(包含在 组织交换中)。聚合策略将原始的交换与资源交换相结合。AggregationStrategy.aggregate (Exchange, Exchange) 方法的第一个参数与原始交换对应,第二个参数对应于资源交换。资源端点的结果存储在资源交换的 Out 消息中。以下是实施您自己的聚合策略类的示例模板:

public class ExampleAggregationStrategy implements AggregationStrategy {

    public Exchange aggregate(Exchange original, Exchange resource) {
        Object originalBody = original.getIn().getBody();
        Object resourceResponse = resource.getOut().getBody();
        Object mergeResult = ... // combine original body and resource response
        if (original.getPattern().isOutCapable()) {
            original.getOut().setBody(mergeResult);
        } else {
            original.getIn().setBody(mergeResult);
        }
        return original;
    }

}

使用此模板时,原始交换可以具有任何交换模式。增强器创建的资源交换始终是一个 InOut 交换。

Spring XML 丰富示例

前面的示例也可以在 Spring XML 中实施:

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="direct:start"/>
    <enrich strategyRef="aggregationStrategy">
      <constant>direct:resource</constant>
    <to uri="direct:result"/>
  </route>
  <route>
    <from uri="direct:resource"/>
    ...
  </route>
</camelContext>
 <bean id="aggregationStrategy" class="..." />

在增强内容时的默认聚合策略

聚合策略是可选的。如果没有提供它,Apache Camel 将使用默认从资源获取的正文。例如:

from("direct:start")
  .enrich("direct:resource")
  .to("direct:result");

在前面的路由中,发送到 direct:result 端点的消息包含来自 direct:resource 的输出,因为此示例不使用任何自定义聚合。

在 XML DSL 中,只需省略 policyRef 属性,如下所示:

<route>
    <from uri="direct:start"/>
    <enrich uri="direct:resource"/>
    <to uri="direct:result"/>
</route>

enrich ()方法支持的选项

丰富的 DSL 命令支持以下选项:

Name

默认值

描述

expression

None

从 Camel 2.16 开始,需要这个选项。指定一个表达式,用于将外部服务的 URI 配置为增强它。您可以使用 Simple 表达式语言、Constant 表达式语言或任何其他可动态计算当前交换值计算 URI 的语言。

uri

 

这些选项已被删除。指定 expression 选项替代。在 Camel 2.15 及更早版本中,需要 uri 选项或 ref 选项的规格。每个选项都指定外部服务的端点 URI,使其增强:

ref

 

引用外部服务的端点,使其增强:您必须使用 uriref

strategyRef

 

指的是用于将来自外部服务的回复合并到一个传出消息中。https://www.javadoc.io/doc/org.apache.camel/camel-core/2.23.2/org/apache/camel/processor/aggregate/AggregationStrategy.html默认情况下,Camel 使用外部服务的回复作为传出消息。您可以使用 POJO 作为 AggregationStrategy。如需更多信息,请参阅 Aggregate 模式的文档。

strategyMethodName

 

当使用 POJO 作为 AggregationStrategy 时,指定此选项来显式声明聚合方法的名称。详情请查看 Aggregate 模式。

strategyMethodAllowNull

false

默认行为是,如果没有数据增强,则不会使用聚合方法。如果此选项为 true,则当没有数据无法增强时,则 null 值将用作 旧的Exchange,并且您使用 POJO 作为 AggregationStrategy。如需更多信息,请参阅 Aggregate 模式。

aggregateOnException

false

默认行为是,如果在尝试检索数据时抛出异常,则不会使用 聚合方法。将此选项设置为 true 可让最终用户控制聚合方法中异常 时要执行的操作。例如,可以阻止异常或设置自定义消息正文

shareUntOfWork

false

从 Camel 2.16 开始,默认行为是增强操作不会在父交换和资源交换之间共享工作单元。这意味着资源交换有自己的独立工作单元。如需更多信息,请参阅 Splitter 模式的文档。

cacheSize

1000

从 Camel 2.16 开始,指定这个选项来为 ProducerCache 配置缓存大小,它会缓存制作者以便在增强操作中重复使用。要关闭此缓存,请将 cacheSize 选项设置为 -1

ignoreInvalidEndpoint

false

从 Camel 2.16 开始,此选项指示是否忽略无法解析的端点 URI。默认行为是,Camel 会抛出一个标识无效端点 URI 的异常。

使用 enrich ()方法指定聚合策略

enrich () 方法从资源端点检索额外的数据,以增强传入消息,该消息包含在原始交换中。您可以使用聚合策略来组合原始的交换和资源交换。AggregationStrategy.aggregate (Exchange, Exchange) 方法的第一个参数与原始交换对应。第二个参数对应于资源交换。资源端点的结果存储在资源交换的 Out 消息中。例如:

AggregationStrategy aggregationStrategy = ...

   from("direct:start")
   .enrich("direct:resource", aggregationStrategy)
   .to("direct:result");

   from("direct:resource")
...

以下代码是实施聚合策略的模板。在使用此模板的实现中,原始交换可以是任何消息交换模式。增强器创建的资源交换始终是 InOut 消息交换模式。

public class ExampleAggregationStrategy implements AggregationStrategy {

    public Exchange aggregate(Exchange original, Exchange resource) {
        Object originalBody = original.getIn().getBody();
        Object resourceResponse = resource.getIn().getBody();
        Object mergeResult = ... // combine original body and resource response
        if (original.getPattern().isOutCapable()) {
            original.getOut().setBody(mergeResult);
        } else {
            original.getIn().setBody(mergeResult);
        }
        return original;
    }

}

以下示例显示了使用 Spring XML DSL 来实现聚合策略:

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="direct:start"/>
    <enrich strategyRef="aggregationStrategy">
      <constant>direct:resource</constant>
    </enrich>
    <to uri="direct:result"/>
  </route>
  <route>
    <from uri="direct:resource"/>
    ...
  </route>
</camelContext>

<bean id="aggregationStrategy" class="..." />

使用带有 enrich ()的动态 URI.

从 Camel 2.16 开始,Enrich ()pollEnrich () 方法支持使用根据当前交换的信息计算的动态 URI。例如,要增强 HTTP 端点,其中带有 orderId 键的标头用作 HTTP URL 的内容路径的一部分,您可以执行以下操作:

from("direct:start")
  .enrich().simple("http:myserver/${header.orderId}/order")
  .to("direct:result");

以下是 XML DSL 中的相同示例:

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<route>
   <from uri="direct:start"/>
   <enrich>
      <simple>http:myserver/${header.orderId}/order</simple>
   </enrich>
   <to uri="direct:result"/>
</route>

使用 pollEnrich ()方法增强内容

pollEnrich 命令将资源端点视为 消费者。它不向资源端点发送交换,而是 轮询 端点。默认情况下,如果资源端点没有可用的交换,则轮询会立即返回。例如,以下路由读取从传入 JMS 消息的标头中提取名称的文件:

from("activemq:queue:order")
   .pollEnrich("file://order/data/additional?fileName=orderId")
   .to("bean:processOrder");

您可以限制等待文件就绪的时间。以下示例显示了最多等待 20 秒:

from("activemq:queue:order")
   .pollEnrich("file://order/data/additional?fileName=orderId", 20000) // timeout is in milliseconds
   .to("bean:processOrder");

您还可以为 pollEnrich () 指定聚合策略,例如:

   .pollEnrich("file://order/data/additional?fileName=orderId", 20000, aggregationStrategy)

pollEnrich () 方法支持使用 consumer.bridgeErrorHandler=true 配置的使用者。这允许将轮询传播到路由错误处理程序的任何异常,例如重试轮询。

注意

consumer.bridgeErrorHandler=true 的支持在 Camel 2.18 中是新的。Camel 2.17 不支持此行为。

如果在接收交换前轮询超时,则传递给聚合策略的 aggregate () 方法的资源交换可能为 null

pollEnrich ()使用的轮询方法

pollEnrich () 方法通过调用以下轮询方法之一来轮询消费者端点:

  • receiveNoWait ()(这是默认值。)
  • receive()
  • receive (长超时)

pollEnrich () 命令的 timeout 参数(以毫秒为单位指定)决定要调用的方法,如下所示:

  • 当超时为 0 或未指定时,pollEnrich () 调用 receiveNoWait
  • 当超时为负数时,pollEnrich () 调用 接收
  • 否则,pollEnrich () 调用 receive (timeout)

如果没有数据,则聚合策略中的 newExchange 为 null。

使用 pollEnrich ()方法的示例

以下示例显示,通过从 inbox/data.txt 文件中加载内容来增强消息:

 from("direct:start")
   .pollEnrich("file:inbox?fileName=data.txt")
   .to("direct:result");

以下是 XML DSL 中的相同示例:

<route>
   <from uri="direct:start"/>
   <pollEnrich>
      <constant>file:inbox?fileName=data.txt"</constant>
   </pollEnrich>
   <to uri="direct:result"/>
</route>

如果指定的文件不存在,则消息为空。您可以指定一个超时时间(可能要等待,直到文件存在或等待特定时长)。在以下示例中,命令不会等待超过 5 秒:

<route>
   <from uri="direct:start"/>
   <pollEnrich timeout="5000">
      <constant>file:inbox?fileName=data.txt"</constant>
   </pollEnrich>
   <to uri="direct:result"/>
</route>

使用带有 pollEnrich ()的动态 URI

从 Camel 2.16 开始,Enrich ()pollEnrich () 方法支持使用根据当前交换的信息计算的动态 URI。例如,若要从使用标头来指示 SEDA 队列名称的端点轮询丰富,您可以执行以下操作:

from("direct:start")
  .pollEnrich().simple("seda:${header.name}")
  .to("direct:result");

以下是 XML DSL 中的相同示例:

<route>
   <from uri="direct:start"/>
   <pollEnrich>
      <simple>seda${header.name}</simple>
   </pollEnrich>
   <to uri="direct:result"/>
</route>

pollEnrich ()方法支持的选项

pollEnrich DSL 命令支持以下选项:

Name

默认值

描述

expression

None

从 Camel 2.16 开始,需要这个选项。指定一个表达式,用于将外部服务的 URI 配置为增强它。您可以使用 Simple 表达式语言、Constant 表达式语言或任何其他可动态计算当前交换值计算 URI 的语言。

uri

 

这些选项已被删除。指定 expression 选项替代。在 Camel 2.15 及更早版本中,需要 uri 选项或 ref 选项的规格。每个选项都指定外部服务的端点 URI,使其增强:

ref

 

引用外部服务的端点,使其增强:您必须使用 uriref

strategyRef

 

指的是用于将来自外部服务的回复合并到一个传出消息中。https://www.javadoc.io/doc/org.apache.camel/camel-core/2.23.2/org/apache/camel/processor/aggregate/AggregationStrategy.html默认情况下,Camel 使用外部服务的回复作为传出消息。您可以使用 POJO 作为 AggregationStrategy。如需更多信息,请参阅 Aggregate 模式的文档。

strategyMethodName

 

当使用 POJO 作为 AggregationStrategy 时,指定此选项来显式声明聚合方法的名称。详情请查看 Aggregate 模式。

strategyMethodAllowNull

false

默认行为是,如果没有数据增强,则不会使用聚合方法。如果此选项为 true,则当没有数据无法增强时,则 null 值将用作 旧的Exchange,并且您使用 POJO 作为 AggregationStrategy。如需更多信息,请参阅 Aggregate 模式。

timeout

-1

从外部服务轮询时,等待响应的最长时间(以毫秒为单位)。默认行为是 pollEnrich () 方法调用 receive () 方法。因为 receive () 可以阻止到有可用的消息,因此建议始终指定一个超时。

aggregateOnException

false

默认行为是,如果在尝试检索数据时抛出异常,则不会使用 聚合方法。将此选项设置为 true 可让最终用户控制聚合方法中异常 时要执行的操作。例如,可以阻止异常或设置自定义消息正文

cacheSize

1000

指定这个选项来为 ConsumerCache 配置缓存大小,它会缓存消费者以在 pollEnrich () 操作中重复使用。要关闭此缓存,请将 cacheSize 选项设置为 -1

ignoreInvalidEndpoint

false

指明是否忽略无法解析的端点 URI。默认行为是,Camel 会抛出一个标识无效端点 URI 的异常。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.