第 10 章 message Transformation
摘要
消息转换模式描述了如何修改消息的内容,以满足各种用途。
10.1. 内容增强
概述
内容增强 模式描述了一种场景,其中消息目的地需要比原始消息中存在的数据更多。在这种情况下,您将使用消息转换器、路由逻辑中的任意处理器,或者内容增强方法从外部资源拉取额外的数据。
图 10.1. 内容增强模式
增强内容的替代方案
Apache Camel 支持多种增强内容:
- 在路由逻辑中带有任意处理器的消息转换器
-
enrich ()
方法通过将当前交换的副本发送到 制作者 端点,然后使用结果回复中的数据,从资源中获取其他数据。增强者创建的交换始终是一个 InOut 交换。 -
pollEnrich ()
方法通过轮询 消费者 端点来获取数据。有效地是pollEnrich ()
操作中来自 main 路由和消费者端点的消费者端点。也就是说,路由中初始消费者上的传入消息会触发轮询消费者的pollEnrich ()
方法。
enrich ()
和 pollEnrich ()
方法支持动态端点 URI。您可以通过指定一个表达式来计算 URI,以便您从当前交换获取值。例如,您可以使用从数据交换计算的名称来轮询文件。Camel 2.16 中引入了此行为。这个变化会破坏 XML DSL,并可让您轻松地迁移。Java DSL 保持向后兼容。
使用消息转换器和处理器功能丰富的内容
Camel 为使用类型安全 IDE 友好的方式创建路由和调解规则提供 流畅的 构建程序,提供智能完成并正在重构。当您测试分布式系统时,需要存存特定外部系统,以便您可以在特定系统可用或编写之前测试其他系统部分。执行此操作的一种方法是使用某种 模板 系统通过生成具有主要静态正文的动态消息来生成对请求的响应。使用模板的另一种方式是使用来自一个目的地的消息,将其转换为 Velocity 或 XQuery,然后将其发送到另一个目的地。以下示例显示了一个 InOnly
(单向)消息:
from("activemq:My.Queue"). to("velocity:com/acme/MyResponse.vm"). to("activemq:Another.Queue");
假设您想要使用 InOut (request-reply)消息传递来处理 ActiveMQ 上的 My.Queue
队列上的请求。您希望模板生成的响应进入 JMSReplyTo
目的地。以下示例演示了如何进行此操作:
from("activemq:My.Queue"). to("velocity:com/acme/MyResponse.vm");
以下简单示例演示了如何使用 DSL 转换消息正文:
from("direct:start").setBody(body().append(" World!")).to("mock:result");
以下示例使用显式 Java 代码来添加处理器:
from("direct:start").process(new Processor() { public void process(Exchange exchange) { Message in = exchange.getIn(); in.setBody(in.getBody(String.class) + " World!"); } }).to("mock:result");
下一个示例使用 bean 集成来启用使用任何 bean 作为转换器:
from("activemq:My.Queue"). beanRef("myBeanName", "myMethodName"). to("activemq:Another.Queue");
以下示例显示了 Spring XML 实施:
<route> <from uri="activemq:Input"/> <bean ref="myBeanName" method="doTransform"/> <to uri="activemq:Output"/> </route>/>
使用 enrich ()方法增强内容
AggregationStrategy aggregationStrategy = ... from("direct:start") .enrich("direct:resource", aggregationStrategy) .to("direct:result"); from("direct:resource") ...
内容增强(丰富的 ) 从资源端点 检索其他数据,以便增强传入消息(包含在 组织交换中)。聚合策略将原始的交换与资源交换相结合。
AggregationStrategy.aggregate (Exchange, Exchange)
方法的第一个参数与原始交换对应,第二个参数对应于资源交换。资源端点的结果存储在资源交换的 Out 消息中。以下是实施您自己的聚合策略类的示例模板:
public class ExampleAggregationStrategy implements AggregationStrategy { public Exchange aggregate(Exchange original, Exchange resource) { Object originalBody = original.getIn().getBody(); Object resourceResponse = resource.getOut().getBody(); Object mergeResult = ... // combine original body and resource response if (original.getPattern().isOutCapable()) { original.getOut().setBody(mergeResult); } else { original.getIn().setBody(mergeResult); } return original; } }
使用此模板时,原始交换可以具有任何交换模式。增强器创建的资源交换始终是一个 InOut 交换。
Spring XML 丰富示例
前面的示例也可以在 Spring XML 中实施:
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <enrich strategyRef="aggregationStrategy"> <constant>direct:resource</constant> <to uri="direct:result"/> </route> <route> <from uri="direct:resource"/> ... </route> </camelContext> <bean id="aggregationStrategy" class="..." />
在增强内容时的默认聚合策略
聚合策略是可选的。如果没有提供它,Apache Camel 将使用默认从资源获取的正文。例如:
from("direct:start") .enrich("direct:resource") .to("direct:result");
在前面的路由中,发送到 direct:result
端点的消息包含来自 direct:resource
的输出,因为此示例不使用任何自定义聚合。
在 XML DSL 中,只需省略 policyRef
属性,如下所示:
<route> <from uri="direct:start"/> <enrich uri="direct:resource"/> <to uri="direct:result"/> </route>
enrich ()方法支持的选项
丰富的
DSL 命令支持以下选项:
Name | 默认值 | 描述 |
| None | 从 Camel 2.16 开始,需要这个选项。指定一个表达式,用于将外部服务的 URI 配置为增强它。您可以使用 Simple 表达式语言、Constant 表达式语言或任何其他可动态计算当前交换值计算 URI 的语言。 |
|
这些选项已被删除。指定 | |
|
引用外部服务的端点,使其增强:您必须使用 | |
|
指的是用于将来自外部服务的回复合并到一个传出消息中。https://www.javadoc.io/doc/org.apache.camel/camel-core/2.23.2/org/apache/camel/processor/aggregate/AggregationStrategy.html默认情况下,Camel 使用外部服务的回复作为传出消息。您可以使用 POJO 作为 | |
|
当使用 POJO 作为 | |
| false |
默认行为是,如果没有数据增强,则不会使用聚合方法。如果此选项为 true,则当没有数据无法增强时,则 null 值将用作 |
| false |
默认行为是,如果在尝试检索数据时抛出异常,则不会使用 聚合方法。将此选项设置为 |
| false | 从 Camel 2.16 开始,默认行为是增强操作不会在父交换和资源交换之间共享工作单元。这意味着资源交换有自己的独立工作单元。如需更多信息,请参阅 Splitter 模式的文档。 |
|
|
从 Camel 2.16 开始,指定这个选项来为 |
| false | 从 Camel 2.16 开始,此选项指示是否忽略无法解析的端点 URI。默认行为是,Camel 会抛出一个标识无效端点 URI 的异常。 |
使用 enrich ()方法指定聚合策略
enrich ()
方法从资源端点检索额外的数据,以增强传入消息,该消息包含在原始交换中。您可以使用聚合策略来组合原始的交换和资源交换。AggregationStrategy.aggregate (Exchange, Exchange)
方法的第一个参数与原始交换对应。第二个参数对应于资源交换。资源端点的结果存储在资源交换的 Out
消息中。例如:
AggregationStrategy aggregationStrategy = ... from("direct:start") .enrich("direct:resource", aggregationStrategy) .to("direct:result"); from("direct:resource") ...
以下代码是实施聚合策略的模板。在使用此模板的实现中,原始交换可以是任何消息交换模式。增强器创建的资源交换始终是 InOut 消息交换模式。
public class ExampleAggregationStrategy implements AggregationStrategy { public Exchange aggregate(Exchange original, Exchange resource) { Object originalBody = original.getIn().getBody(); Object resourceResponse = resource.getIn().getBody(); Object mergeResult = ... // combine original body and resource response if (original.getPattern().isOutCapable()) { original.getOut().setBody(mergeResult); } else { original.getIn().setBody(mergeResult); } return original; } }
以下示例显示了使用 Spring XML DSL 来实现聚合策略:
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <enrich strategyRef="aggregationStrategy"> <constant>direct:resource</constant> </enrich> <to uri="direct:result"/> </route> <route> <from uri="direct:resource"/> ... </route> </camelContext> <bean id="aggregationStrategy" class="..." />
使用带有 enrich ()的动态 URI.
从 Camel 2.16 开始,Enrich ()
和 pollEnrich ()
方法支持使用根据当前交换的信息计算的动态 URI。例如,要增强 HTTP 端点,其中带有 orderId
键的标头用作 HTTP URL 的内容路径的一部分,您可以执行以下操作:
from("direct:start") .enrich().simple("http:myserver/${header.orderId}/order") .to("direct:result");
以下是 XML DSL 中的相同示例:
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <enrich> <simple>http:myserver/${header.orderId}/order</simple> </enrich> <to uri="direct:result"/> </route>
使用 pollEnrich ()方法增强内容
pollEnrich
命令将资源端点视为 消费者。它不向资源端点发送交换,而是 轮询 端点。默认情况下,如果资源端点没有可用的交换,则轮询会立即返回。例如,以下路由读取从传入 JMS 消息的标头中提取名称的文件:
from("activemq:queue:order") .pollEnrich("file://order/data/additional?fileName=orderId") .to("bean:processOrder");
您可以限制等待文件就绪的时间。以下示例显示了最多等待 20 秒:
from("activemq:queue:order") .pollEnrich("file://order/data/additional?fileName=orderId", 20000) // timeout is in milliseconds .to("bean:processOrder");
您还可以为 pollEnrich ()
指定聚合策略,例如:
.pollEnrich("file://order/data/additional?fileName=orderId", 20000, aggregationStrategy)
pollEnrich ()
方法支持使用 consumer.bridgeErrorHandler=true
配置的使用者。这允许将轮询传播到路由错误处理程序的任何异常,例如重试轮询。
对 consumer.bridgeErrorHandler=true
的支持在 Camel 2.18 中是新的。Camel 2.17 不支持此行为。
如果在接收交换前轮询超时,则传递给聚合策略的 aggregate ()
方法的资源交换可能为 null
。
pollEnrich ()使用的轮询方法
pollEnrich ()
方法通过调用以下轮询方法之一来轮询消费者端点:
-
receiveNoWait ()
(这是默认值。) -
receive()
-
receive (长超时)
pollEnrich ()
命令的 timeout 参数(以毫秒为单位指定)决定要调用的方法,如下所示:
-
当超时为
0
或未指定时,pollEnrich ()
调用receiveNoWait
。 -
当超时为负数时,
pollEnrich ()
调用接收
。 -
否则,
pollEnrich ()
调用receive (timeout)
。
如果没有数据,则聚合策略中的 newExchange
为 null。
使用 pollEnrich ()方法的示例
以下示例显示,通过从 inbox/data.txt
文件中加载内容来增强消息:
from("direct:start") .pollEnrich("file:inbox?fileName=data.txt") .to("direct:result");
以下是 XML DSL 中的相同示例:
<route> <from uri="direct:start"/> <pollEnrich> <constant>file:inbox?fileName=data.txt"</constant> </pollEnrich> <to uri="direct:result"/> </route>
如果指定的文件不存在,则消息为空。您可以指定一个超时时间(可能要等待,直到文件存在或等待特定时长)。在以下示例中,命令不会等待超过 5 秒:
<route> <from uri="direct:start"/> <pollEnrich timeout="5000"> <constant>file:inbox?fileName=data.txt"</constant> </pollEnrich> <to uri="direct:result"/> </route>
使用带有 pollEnrich ()的动态 URI
从 Camel 2.16 开始,Enrich ()
和 pollEnrich ()
方法支持使用根据当前交换的信息计算的动态 URI。例如,若要从使用标头来指示 SEDA 队列名称的端点轮询丰富,您可以执行以下操作:
from("direct:start") .pollEnrich().simple("seda:${header.name}") .to("direct:result");
以下是 XML DSL 中的相同示例:
<route> <from uri="direct:start"/> <pollEnrich> <simple>seda${header.name}</simple> </pollEnrich> <to uri="direct:result"/> </route>
pollEnrich ()方法支持的选项
pollEnrich
DSL 命令支持以下选项:
Name | 默认值 | 描述 |
| None | 从 Camel 2.16 开始,需要这个选项。指定一个表达式,用于将外部服务的 URI 配置为增强它。您可以使用 Simple 表达式语言、Constant 表达式语言或任何其他可动态计算当前交换值计算 URI 的语言。 |
|
这些选项已被删除。指定 | |
|
引用外部服务的端点,使其增强:您必须使用 | |
|
指的是用于将来自外部服务的回复合并到一个传出消息中。https://www.javadoc.io/doc/org.apache.camel/camel-core/2.23.2/org/apache/camel/processor/aggregate/AggregationStrategy.html默认情况下,Camel 使用外部服务的回复作为传出消息。您可以使用 POJO 作为 | |
|
当使用 POJO 作为 | |
| false |
默认行为是,如果没有数据增强,则不会使用聚合方法。如果此选项为 true,则当没有数据无法增强时,则 null 值将用作 |
|
|
从外部服务轮询时,等待响应的最长时间(以毫秒为单位)。默认行为是 |
| false |
默认行为是,如果在尝试检索数据时抛出异常,则不会使用 聚合方法。将此选项设置为 |
|
|
指定这个选项来为 |
| false | 指明是否忽略无法解析的端点 URI。默认行为是,Camel 会抛出一个标识无效端点 URI 的异常。 |