第 37 章 制作者和消费者模板
摘要
Apache Camel 中的制作者和消费者模板在 Spring 容器 API 的功能后进行建模,通过简化的、易于使用的 API (称为 模板 )来提供对资源的访问。对于 Apache Camel,生产者模板和使用者模板提供了简化的接口,用于将消息发送到制作者端点和消费者端点并接收消息。
37.1. 使用 Producer 模板
37.1.1. Producer 模板简介
概述
producer 模板支持各种方法来调用制作者端点。有的方法支持针对请求消息的不同格式(作为 Exchange
对象,作为消息正文,作为单一标题设置的消息正文),以及支持同步和异步调用方式。总体而言,生产者模板方法可以划分为以下类别:
另外,请参阅 第 37.2 节 “使用 Fluent Producer 模板”。
同步调用
异步调用端点的方法具有以 发送Suffix()
形式的名称,并请求Suffix
()。例如,使用默认消息交换模式(MEP)或显式指定的 MEP 来调用端点的方法命名为 send ()
、sendBody ()
和 sendBodyAndHeader ()
(其中,这些方法分别发送 Exchange
对象、消息正文或消息正文和标头值)。如果要强制使用 MEP 为 InOut (request/reply语义s),您可以调用 request ()、
requestBody ()
和 requestBodyAndHeader ()
方法。
以下示例演示了如何创建 ProducerTemplate
实例,并使用它来向 activemq:MyQueue
端点发送消息正文。这个示例还演示了如何使用 sendBodyAndHeader ()
发送消息正文和标头值。
import org.apache.camel.ProducerTemplate import org.apache.camel.impl.DefaultProducerTemplate ... ProducerTemplate template = context.createProducerTemplate(); // Send to a specific queue template.sendBody("activemq:MyQueue", "<hello>world!</hello>"); // Send with a body and header template.sendBodyAndHeader( "activemq:MyQueue", "<hello>world!</hello>", "CustomerRating", "Gold" );
使用处理器同步调用
同步调用的一个特殊情形是,您使用 Processor
参数而不是 Exchange
参数提供 send ()
方法。在本例中,生产者模板隐式要求指定端点以创建 Exchange
实例(通常,但默认情况下不始终使用 InOnly MEP)。然后,此默认交换会传递到处理器,这会初始化交换对象的内容。
以下示例演示了如何将 MyProcessor
处理器初始化的交换发送到 activemq:MyQueue
端点。
import org.apache.camel.ProducerTemplate import org.apache.camel.impl.DefaultProducerTemplate ... ProducerTemplate template = context.createProducerTemplate(); // Send to a specific queue, using a processor to initialize template.send("activemq:MyQueue", new MyProcessor());
MyProcessor
类的实施如下例中所示。除了设置 In 消息正文(如此处所示)外,您还可以初始化消息标题和交换属性。
import org.apache.camel.Processor; import org.apache.camel.Exchange; ... public class MyProcessor implements Processor { public MyProcessor() { } public void process(Exchange ex) { ex.getIn().setBody("<hello>world!</hello>"); } }
异步调用
异步 调用端点的方法具有组成 asyncSendSuffix()
和 asyncRequestSuffix()的格式
。例如,使用默认消息交换模式(MEP)或显式指定的 MEP 调用端点的方法命名为 asyncSend ()
和 asyncSendBody ()
(其中,这些方法分别发送 Exchange
对象或消息正文)。如果要强制使用 MEP 为 InOut (request/reply语义s),您可以调用 asyncRequestBody ()、Async
RequestBodyAndHeader ()
和 asyncRequestBodyAndHeaders ()
方法。
以下示例演示了如何异步向 direct:start
端点发送交换。asyncSend ()
方法返回 java.util.concurrent.Future
对象,该对象用于稍后检索调用结果。
import java.util.concurrent.Future; import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultExchange; ... Exchange exchange = new DefaultExchange(context); exchange.getIn().setBody("Hello"); Future<Exchange> future = template.asyncSend("direct:start", exchange); // You can do other things, whilst waiting for the invocation to complete ... // Now, retrieve the resulting exchange from the Future Exchange result = future.get();
producer 模板还提供以异步方式发送消息正文的方法(例如,使用 asyncSendBody ()
或 asyncRequestBody ()
)。在这种情况下,您可以使用以下帮助程序方法之一从 Future
对象提取返回的消息正文:
<T> T extractFutureBody(Future future, Class<T> type); <T> T extractFutureBody(Future future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException;
extractFutureBody ()
方法的第一个版本会阻止,直到调用完成并且回复消息可用。通过 extractFutureBody ()
方法的第二个版本,您可以指定一个超时。这两种方法都具有一个 type 参数,即使用内置 类型
转换器将返回的消息正文转换为指定类型。
以下示例演示了如何使用 asyncRequestBody ()
方法将消息正文发送到 direct:start
端点。然后,使用 blocking extractFutureBody ()
方法从 Future
对象检索回复消息正文。
Future<Object> future = template.asyncRequestBody("direct:start", "Hello"); // You can do other things, whilst waiting for the invocation to complete ... // Now, retrieve the reply message body as a String type String result = template.extractFutureBody(future, String.class);
使用回调进行异步调用
在前面的异步示例中,请求消息在子线程中分配,而回复由主线程检索和处理。producer 模板还会为您提供 选项,但处理子线程中回复的、使用 asyncCallback ()、
或 asyncCallbackSendBody ()
asyncCallbackRequestBody ()
方法之一。在这种情况下,您提供一个回调对象(来自 org.apache.camel.impl.SynchronizationAdapter
类型),一旦回复消息到达后,它会自动调用子线程。
同步
回调接口定义如下:
package org.apache.camel.spi;
import org.apache.camel.Exchange;
public interface Synchronization {
void onComplete(Exchange exchange);
void onFailure(Exchange exchange);
}
在收到正常回复时,调用 onComplete ()
方法,并在收到容错消息回复时调用 onFailure ()
方法。只有其中一种方法被重新调用,因此您必须覆盖这两个方法以确保处理所有类型的回复。
以下示例演示了如何将交换发送到 direct:start
端点,其中由 SynchronizationAdapter
回调对象在子线程中处理回复消息。
import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultExchange; import org.apache.camel.impl.SynchronizationAdapter; ... Exchange exchange = context.getEndpoint("direct:start").createExchange(); exchange.getIn().setBody("Hello"); Future<Exchange> future = template.asyncCallback("direct:start", exchange, new SynchronizationAdapter() { @Override public void onComplete(Exchange exchange) { assertEquals("Hello World", exchange.getIn().getBody()); } });
如果 SynchronizationAdapter
类是 Synchronization
接口的默认实现,您可以覆盖它们,以在 Complete ()和
回调方法中提供自己的定义。
onFailure ()
您仍然有从主线程访问回复的选项,因为 asyncCallback ()
方法也返回 Future
objectWITH- OVNKubernetes,例如:
// Retrieve the reply from the main thread, specifying a timeout Exchange reply = future.get(10, TimeUnit.SECONDS);
37.1.2. 同步发送
概述
同步发送 方法是可用于调用制作者端点的方法集合,其中当前线程块直到方法调用完成并且收到回复(若有)为止。这些方法与任何种类的消息交换协议兼容。
发送交换
基本 send ()
方法是一种通用方法,它使用交换模式(MEP)的消息交换模式(MEP)将 Exchange
对象的内容发送到端点。返回值为您在制作者端点处理后获得的交换(可能包含 Out 消息,具体取决于 MEP)。
send ()
方法有三个 varieties,用于发送交换,它可让您以以下一种方式指定目标端点: 作为默认端点,作为端点 URI 或作为 Endpoint
对象。
Exchange send(Exchange exchange); Exchange send(String endpointUri, Exchange exchange); Exchange send(Endpoint endpoint, Exchange exchange);
发送由处理器填充的交换
一般的 send ()
方法的一种简单变体是使用处理器来填充默认交换,而不是显式提供交换对象(请参阅 “使用处理器同步调用”一节 了解详细信息)。
发送由处理器填充的交换的 send ()
方法可让您以以下一种方式指定目标端点: 作为默认端点,作为端点 URI 或作为 Endpoint
对象。另外,您可以通过提供 模式
参数而不是接受默认值来指定交换的 MEP。
Exchange send(Processor processor); Exchange send(String endpointUri, Processor processor); Exchange send(Endpoint endpoint, Processor processor); Exchange send( String endpointUri, ExchangePattern pattern, Processor processor ); Exchange send( Endpoint endpoint, ExchangePattern pattern, Processor processor );
发送消息正文
如果您只关注您要发送的消息正文内容,您可以使用 sendBody ()
方法将消息正文作为参数提供,并让制作者模板负责将正文插入到默认交换对象中。
sendBody ()
方法允许您以以下一种方式指定目标端点: 作为默认端点,作为端点 URI,或者作为 Endpoint
对象。另外,您可以通过提供 模式
参数而不是接受默认值来指定交换的 MEP。没有 模式
参数返回 void
的方法(即使调用可能会引发在某些情况下的回复);以及具有 模式
参数的方法返回 Out 消息正文(如果存在的话)或 In message 的正文(否则)。
void sendBody(Object body); void sendBody(String endpointUri, Object body); void sendBody(Endpoint endpoint, Object body); Object sendBody( String endpointUri, ExchangePattern pattern, Object body ); Object sendBody( Endpoint endpoint, ExchangePattern pattern, Object body );
发送消息正文和标题
出于测试目的,尝试 单个 标头设置的影响通常会很感兴趣,而 sendBodyAndHeader ()
方法对此类标头测试很有用。请提供消息正文和标头设置作为参数,以 发送BodyAndHeader ()
,并让制作者模板负责将正文和标头设置插入到默认交换对象中。
sendBodyAndHeader ()
方法可让您以以下一种方式指定目标端点: 作为默认端点,作为端点 URI 或作为 Endpoint
对象。另外,您可以通过提供 模式
参数而不是接受默认值来指定交换的 MEP。没有 模式
参数返回 void
的方法(即使调用可能会引发在某些情况下的回复);以及具有 模式
参数的方法返回 Out 消息正文(如果存在的话)或 In message 的正文(否则)。
void sendBodyAndHeader( Object body, String header, Object headerValue ); void sendBodyAndHeader( String endpointUri, Object body, String header, Object headerValue ); void sendBodyAndHeader( Endpoint endpoint, Object body, String header, Object headerValue ); Object sendBodyAndHeader( String endpointUri, ExchangePattern pattern, Object body, String header, Object headerValue ); Object sendBodyAndHeader( Endpoint endpoint, ExchangePattern pattern, Object body, String header, Object headerValue );
sendBodyAndHeaders ()
方法类似于 sendBodyAndHeader ()
方法,除了仅提供单个标头设置外,这些方法允许您指定完整的标题设置。
void sendBodyAndHeaders( Object body, Map<String, Object> headers ); void sendBodyAndHeaders( String endpointUri, Object body, Map<String, Object> headers ); void sendBodyAndHeaders( Endpoint endpoint, Object body, Map<String, Object> headers ); Object sendBodyAndHeaders( String endpointUri, ExchangePattern pattern, Object body, Map<String, Object> headers ); Object sendBodyAndHeaders( Endpoint endpoint, ExchangePattern pattern, Object body, Map<String, Object> headers );
发送消息正文和交换属性
您可以使用 sendBodyAndProperty ()
方法尝试设置单一交换属性的效果。请提供消息正文和属性设置作为参数,以 发送BodyAndProperty ()
,并让制作者模板负责将正文和交换属性插入到默认交换对象中。
sendBodyAndProperty ()
方法允许您以以下一种方式指定目标端点: 作为默认端点,作为端点 URI 或作为 Endpoint
对象。另外,您可以通过提供 模式
参数而不是接受默认值来指定交换的 MEP。没有 模式
参数返回 void
的方法(即使调用可能会引发在某些情况下的回复);以及具有 模式
参数的方法返回 Out 消息正文(如果存在的话)或 In message 的正文(否则)。
void sendBodyAndProperty( Object body, String property, Object propertyValue ); void sendBodyAndProperty( String endpointUri, Object body, String property, Object propertyValue ); void sendBodyAndProperty( Endpoint endpoint, Object body, String property, Object propertyValue ); Object sendBodyAndProperty( String endpoint, ExchangePattern pattern, Object body, String property, Object propertyValue ); Object sendBodyAndProperty( Endpoint endpoint, ExchangePattern pattern, Object body, String property, Object propertyValue );
37.1.3. 使用 InOut Pattern 同步请求
概述
同步请求 方法与同步发送方法类似,但请求方法强制进行消息交换模式为 InOut (与请求/恢复语义相关)。因此,如果您预期从制作者端点收到回复,通常最好使用同步请求方法。
请求由处理器填充的交换
基本 request ()
方法是一种通用方法,它使用处理器填充默认交换模式并强制将消息交换模式为 InOut (因此调用 obeys request/reply 语义)。返回值为您在由制作者端点(其中 Out 消息包含回复消息)处理后获得的交换。
用来发送由处理器填充的交换的 request ()
方法,您可以使用以下方法之一来指定目标端点: 作为端点 URI,或作为 Endpoint
对象。
Exchange request(String endpointUri, Processor processor); Exchange request(Endpoint endpoint, Processor processor);
请求邮件正文
如果您只关注请求和回复中消息正文的内容,您可以使用 requestBody ()
方法将请求消息正文作为参数提供,并让制作者模板负责将正文插入到默认交换对象中。
requestBody ()
方法允许您以以下一种方式指定目标端点: 作为默认端点,作为端点 URI,或者作为 Endpoint
对象。返回值为回复消息正文(Out message body),它可以 作为普通
对象返回,也可以使用内置的类型转换器(参见 第 34.3 节 “内置类型 Converters”)。
Object requestBody(Object body); <T> T requestBody(Object body, Class<T> type); Object requestBody( String endpointUri, Object body ); <T> T requestBody( String endpointUri, Object body, Class<T> type ); Object requestBody( Endpoint endpoint, Object body ); <T> T requestBody( Endpoint endpoint, Object body, Class<T> type );
请求邮件正文和标题.
您可以使用 requestBodyAndHeader ()
方法尝试设置单个标头值的效果。您可以将消息正文和标头设置为 requestBodyAndHeader ()
的参数,并让制作者模板负责将正文和交换属性插入到默认交换对象中。
requestBodyAndHeader ()
方法可让您以以下一种方式指定目标端点: 作为端点 URI,或作为 Endpoint
对象。返回值为回复消息正文(Out message body),它可以 作为普通
对象返回,也可以使用内置的类型转换器(参见 第 34.3 节 “内置类型 Converters”)。
Object requestBodyAndHeader( String endpointUri, Object body, String header, Object headerValue ); <T> T requestBodyAndHeader( String endpointUri, Object body, String header, Object headerValue, Class<T> type ); Object requestBodyAndHeader( Endpoint endpoint, Object body, String header, Object headerValue ); <T> T requestBodyAndHeader( Endpoint endpoint, Object body, String header, Object headerValue, Class<T> type );
requestBodyAndHeaders ()
方法类似于 requestBodyAndHeader ()
方法,除了仅提供单个标头设置外,这些方法允许您指定完整的标题设置。
Object requestBodyAndHeaders( String endpointUri, Object body, Map<String, Object> headers ); <T> T requestBodyAndHeaders( String endpointUri, Object body, Map<String, Object> headers, Class<T> type ); Object requestBodyAndHeaders( Endpoint endpoint, Object body, Map<String, Object> headers ); <T> T requestBodyAndHeaders( Endpoint endpoint, Object body, Map<String, Object> headers, Class<T> type );
37.1.4. 异步发送
概述
producer 模板提供了多种用于异步调用制作者端点的方法,使得主线程在等待调用完成期间不阻止,并且以后可以检索回复消息。本节中描述的异步发送方法与任何类型的消息交换协议兼容。
发送交换
基本 asyncSend ()
方法采用 Exchange
参数,并通过指定交换模式(MEP)异步调用端点。返回值是一个 java.util.concurrent.Future
对象,它是一个 ticket,您可以在稍后的 time 列表中选择回复消息,以获取有关如何从 Future
对象获取返回值的详细信息,请参阅 “异步调用”一节。
以下 asyncSend ()
方法允许您以以下一种方式指定目标端点: 作为端点 URI,或者作为 Endpoint
对象。
Future<Exchange> asyncSend(String endpointUri, Exchange exchange); Future<Exchange> asyncSend(Endpoint endpoint, Exchange exchange);
发送由处理器填充的交换
一般 asyncSend ()
方法的一种简单变体是使用处理器来填充默认交换,而不是显式提供交换对象。
以下 asyncSend ()
方法允许您以以下一种方式指定目标端点: 作为端点 URI,或者作为 Endpoint
对象。
Future<Exchange> asyncSend(String endpointUri, Processor processor); Future<Exchange> asyncSend(Endpoint endpoint, Processor processor);
发送消息正文
如果您只关注您要发送的消息正文内容,您可以使用 asyncSendBody ()
方法异步发送消息正文,并让制作者模板负责将正文插入到默认交换对象中。
asyncSendBody ()
方法可让您以以下一种方式指定目标端点: 作为端点 URI,或者作为 Endpoint
对象。
Future<Object> asyncSendBody(String endpointUri, Object body); Future<Object> asyncSendBody(Endpoint endpoint, Object body);
37.1.5. InOut Pattern 的异步请求
概述
异步请求 方法类似于异步发送方法,但请求方法强制进行消息交换模式为 InOut (与请求/关系语义相关)。因此,如果您期望从制作者端点收到回复,通常最好使用异步请求方法。
请求邮件正文
如果您只关注请求和回复中消息正文的内容,您可以使用 requestBody ()
方法将请求消息正文作为参数提供,并让制作者模板负责将正文插入到默认交换对象中。
asyncRequestBody ()
方法允许您以以下一种方式指定目标端点: 作为端点 URI,或者作为 Endpoint
对象。从 Future
对象检索的返回值是回复消息正文(Out message body),它可以 作为普通
对象返回,也可以使用内置类型转换器(参见 “异步调用”一节)。
Future<Object> asyncRequestBody( String endpointUri, Object body ); <T> Future<T> asyncRequestBody( String endpointUri, Object body, Class<T> type ); Future<Object> asyncRequestBody( Endpoint endpoint, Object body ); <T> Future<T> asyncRequestBody( Endpoint endpoint, Object body, Class<T> type );
请求邮件正文和标题.
您可以使用 asyncRequestBodyAndHeader ()
方法尝试设置单个标头值的效果。您可以将消息正文和标头设置为 asyncRequestBodyAndHeader ()
,并让 producer 模板负责将正文和交换属性插入到默认交换对象中。
asyncRequestBodyAndHeader ()
方法可让您以以下方式之一指定目标端点: 作为端点 URI,或者作为 Endpoint
对象。从 Future
对象检索的返回值是回复消息正文(Out message body),它可以 作为普通
对象返回,也可以使用内置类型转换器(参见 “异步调用”一节)。
Future<Object> asyncRequestBodyAndHeader( String endpointUri, Object body, String header, Object headerValue ); <T> Future<T> asyncRequestBodyAndHeader( String endpointUri, Object body, String header, Object headerValue, Class<T> type ); Future<Object> asyncRequestBodyAndHeader( Endpoint endpoint, Object body, String header, Object headerValue ); <T> Future<T> asyncRequestBodyAndHeader( Endpoint endpoint, Object body, String header, Object headerValue, Class<T> type );
asyncRequestBodyAndHeaders ()
方法类似于 asyncRequestBodyAndHeader ()
方法,除了提供单一标头设置外,这些方法允许您指定标题设置的完整散列映射。
Future<Object> asyncRequestBodyAndHeaders( String endpointUri, Object body, Map<String, Object> headers ); <T> Future<T> asyncRequestBodyAndHeaders( String endpointUri, Object body, Map<String, Object> headers, Class<T> type ); Future<Object> asyncRequestBodyAndHeaders( Endpoint endpoint, Object body, Map<String, Object> headers ); <T> Future<T> asyncRequestBodyAndHeaders( Endpoint endpoint, Object body, Map<String, Object> headers, Class<T> type );
37.1.6. 使用回调异步发送
概述
producer 模板也提供 选项,用于处理用于调用 producer 端点的同一子线程中回复消息。在这种情况下,您提供一个回调对象,在收到回复消息时自动在子线程中调用它。换而言之,使用回调方法的异步发送 可让您在主线程中启动调用,然后让制作者端点的所有关联处理在 producer 端点上均须等待回复并处理在子线程中异步处理回复便可以处理该制作者端点。
发送交换
基本 asyncCallback ()
方法采用 Exchange
参数,并通过指定交换模式(MEP)异步调用端点。这个方法与用于交换的 asyncSend ()
方法类似,但它有额外的 org.apache.camel.spi.Synchronization
参数,它有两个方法: onComplete ()
和 onFailure ()
。有关如何使用 同步
回调的详情,请参考 “使用回调进行异步调用”一节。
以下 asyncCallback ()
方法允许您以以下一种方式指定目标端点: 作为端点 URI,或者作为 Endpoint
对象。
Future<Exchange> asyncCallback( String endpointUri, Exchange exchange, Synchronization onCompletion ); Future<Exchange> asyncCallback( Endpoint endpoint, Exchange exchange, Synchronization onCompletion );
发送由处理器填充的交换
处理器的 asyncCallback ()
方法调用处理器来填充默认交换模式,并强制进行消息交换模式为 InOut (因此调用 obeys request/reply语义)。
以下 asyncCallback ()
方法允许您以以下一种方式指定目标端点: 作为端点 URI,或者作为 Endpoint
对象。
Future<Exchange> asyncCallback( String endpointUri, Processor processor, Synchronization onCompletion ); Future<Exchange> asyncCallback( Endpoint endpoint, Processor processor, Synchronization onCompletion );
发送消息正文
如果您只关注您要发送的消息正文内容,您可以使用 asyncCallbackSendBody ()
方法异步发送消息正文,并让制作者模板负责将正文插入到默认交换对象中。
asyncCallbackSendBody ()
方法可让您以以下方式之一指定目标端点: 作为端点 URI 或作为 Endpoint
对象。
Future<Object> asyncCallbackSendBody( String endpointUri, Object body, Synchronization onCompletion ); Future<Object> asyncCallbackSendBody( Endpoint endpoint, Object body, Synchronization onCompletion );
请求邮件正文
如果您只关注请求中消息正文的内容和回复中的内容,您可以使用 asyncCallbackRequestBody ()
方法提供请求消息正文作为参数,并让制作者模板将正文插入到默认交换对象中。
asyncCallbackRequestBody ()
方法可让您以以下方式之一指定目标端点: 作为端点 URI,或者作为 Endpoint
对象。
Future<Object> asyncCallbackRequestBody( String endpointUri, Object body, Synchronization onCompletion ); Future<Object> asyncCallbackRequestBody( Endpoint endpoint, Object body, Synchronization onCompletion );