第 37 章 生产者和消费者模板
摘要
Apache Camel 中的生成者和消费者模板在 Spring 容器 API 的功能后建模,通过一个简单易用的 API (称为 模板 )来提供资源的访问。如果是 Apache Camel,生产者模板和消费者模板提供了简化的接口,用于向和接收来自生成者端点和消费者端点的消息。
37.1. 使用 Producer 模板
37.1.1. Producer 模板简介
概述
producer 模板支持各种不同的方法来调用制作者端点。有方法支持请求消息的不同格式(作为 Exchange
对象,作为消息正文,作为带单个标头设置的消息正文,等等),有支持同步和异步调用方式的方法。总体而言,生成者模板方法可分为以下类别:
或者,请参阅 第 37.2 节 “使用 Fluent Producer 模板”。
同步调用
同步调用端点的方法具有 发送后缀()和
的形式的名称。例如,使用默认消息交换模式(MEP)或明确指定的 MEP 调用端点的方法被命名为 请求
后缀()send ()
, sendBody ()
, 和 sendBodyAndHeader ()
(这些方法分别发送 Exchange
对象、消息正文或消息正文和标头值)。如果要强制 MEP 为 InOut (请求/回复语义),您可以改为调用 request ()
、requestBody ()
和 requestBodyAndHeader ()
方法。
以下示例演示了如何创建 ProducerTemplate
实例,并使用它来发送消息正文到 activemq:MyQueue
端点。示例还演示了如何使用 sendBodyAndHeader ()
发送消息正文和标头值。
import org.apache.camel.ProducerTemplate import org.apache.camel.impl.DefaultProducerTemplate ... ProducerTemplate template = context.createProducerTemplate(); // Send to a specific queue template.sendBody("activemq:MyQueue", "<hello>world!</hello>"); // Send with a body and header template.sendBodyAndHeader( "activemq:MyQueue", "<hello>world!</hello>", "CustomerRating", "Gold" );
使用处理器进行同步调用
同步调用的一个特殊情况是您为 send ()
方法提供 Processor
参数而不是 Exchange
参数。在这种情况下,生成者模板会隐式要求指定的端点创建 Exchange
实例(通常并非始终默认具有 InOnly MEP)。然后,此默认交换传递到处理器,它将初始化交换对象的内容。
以下示例演示了如何将 MyProcessor
处理器初始化的交换发送到 activemq:MyQueue
端点。
import org.apache.camel.ProducerTemplate import org.apache.camel.impl.DefaultProducerTemplate ... ProducerTemplate template = context.createProducerTemplate(); // Send to a specific queue, using a processor to initialize template.send("activemq:MyQueue", new MyProcessor());
MyProcessor
类按照以下示例所示实施。除了设置 In message body (如此处所示),您还可以初始化消息标题和交换属性。
import org.apache.camel.Processor; import org.apache.camel.Exchange; ... public class MyProcessor implements Processor { public MyProcessor() { } public void process(Exchange ex) { ex.getIn().setBody("<hello>world!</hello>"); } }
异步调用
异步 调用端点的方法具有 asyncSendSuffix()和
形式的名称。例如,使用默认消息交换模式(MEP)或明确指定的 MEP 调用端点的方法被命名为 asyncRequestSuffix()
asyncSend ()
和 asyncSendBody ()
(其中这些方法分别发送 Exchange
对象或消息正文)。如果要强制 MEP 为 InOut (request/reply 语义),您可以调用 asyncRequestBody ()
、syncRequestBodyAndHeader ()、syncRequestBodyAndHeader ()
和 asyncRequestBodyAndHeaders ()
方法。
以下示例演示了如何异步向 direct:start
端点发送交换。asyncSend ()
方法返回一个 java.util.concurrent.Future
对象,用于稍后检索调用结果。
import java.util.concurrent.Future; import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultExchange; ... Exchange exchange = new DefaultExchange(context); exchange.getIn().setBody("Hello"); Future<Exchange> future = template.asyncSend("direct:start", exchange); // You can do other things, whilst waiting for the invocation to complete ... // Now, retrieve the resulting exchange from the Future Exchange result = future.get();
producer 模板还提供异步发送消息正文的方法(例如,使用 asyncSendBody ()
或 asyncRequestBody ()
)。在这种情况下,您可以使用以下帮助程序方法之一从 Future
对象中提取返回的消息正文:
<T> T extractFutureBody(Future future, Class<T> type); <T> T extractFutureBody(Future future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException;
extractFutureBody ()
方法块的第一个版本,直到调用完成并且回复消息可用为止。extractFutureBody ()
方法的第二个版本允许您指定超时。两种方法都具有类型参数,键入
,它将返回的消息正文转换为指定类型(使用内置类型转换器)。
以下示例演示了如何使用 asyncRequestBody ()
方法将消息正文发送到 direct:start
端点。然后,使用 blocking extractFutureBody ()
方法从 Future
对象检索回复消息正文。
Future<Object> future = template.asyncRequestBody("direct:start", "Hello"); // You can do other things, whilst waiting for the invocation to complete ... // Now, retrieve the reply message body as a String type String result = template.extractFutureBody(future, String.class);
使用回调进行异步调用
在前面的异步示例中,请求消息在子线程中分配,而回复则由主线程检索和处理。producer 模板还为您提供了选项,但子线程中的处理回复使用 asyncCallback ()
, , asyncCallbackSendBody ()
方法之一,或 asyncCallbackRequestBody ()
方法。在这种情况下,您可以提供一个回调对象( org.apache.camel.impl.SynchronizationAdapter
类型),它会在回复消息到达时在子线程中自动调用。
同步
回调接口定义如下:
package org.apache.camel.spi;
import org.apache.camel.Exchange;
public interface Synchronization {
void onComplete(Exchange exchange);
void onFailure(Exchange exchange);
}
如果在收到正常回复时调用 onComplete ()
方法,并且在收到错误消息回复时调用 onFailure ()
方法。只有其中一种方法会调用回来,因此您必须覆盖这两个方法,以确保处理所有类型的回复。
以下示例演示了如何将交换发送到 direct:start
端点,其中 SynchronizationAdapter
回调对象在子线程中处理回复消息。
import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultExchange; import org.apache.camel.impl.SynchronizationAdapter; ... Exchange exchange = context.getEndpoint("direct:start").createExchange(); exchange.getIn().setBody("Hello"); Future<Exchange> future = template.asyncCallback("direct:start", exchange, new SynchronizationAdapter() { @Override public void onComplete(Exchange exchange) { assertEquals("Hello World", exchange.getIn().getBody()); } });
其中
类是同步接口的默认实现,您可以覆盖它来提供您自己对 Synchronization
AdapteronComplete ()
和 onFailure ()
回调方法的定义。
您仍然有从主线程访问回复的选项,因为 asyncCallback ()
方法也会返回 Future
object swig-wagon 例如:
// Retrieve the reply from the main thread, specifying a timeout Exchange reply = future.get(10, TimeUnit.SECONDS);
37.1.2. 同步发送
概述
同步发送 方法是一组可用于调用制作者端点的方法集合,其中当前线程块直到方法调用完成并且收到回复(若有)。这些方法与任何类型的消息交换协议兼容。
发送交换
基本的 send ()
方法是一种通用方法,利用交换的消息交换模式(MEP)将 Exchange
对象的内容发送到端点。返回值是您在生成者端点处理后获取的交换(可能包含 Out 消息,具体取决于 MEP)。
发送交换有三种 send ()
方法,允许您以以下一种方式指定目标端点: 作为默认端点、端点、端点 URI 或 Endpoint
对象。
Exchange send(Exchange exchange); Exchange send(String endpointUri, Exchange exchange); Exchange send(Endpoint endpoint, Exchange exchange);
发送由处理器填充的交换
常规 send ()
方法的一个简单变体是使用处理器填充默认交换,而不是显式提供交换对象(详情请参阅 “使用处理器进行同步调用”一节 )。
发送由处理器填充的交换的 send ()
方法可让您以以下一种方式指定目标端点: 作为默认端点、端点 URI 或 Endpoint
对象。此外,您还可以通过提供 pattern
参数来指定交换的 MEP,而不接受默认值。
Exchange send(Processor processor); Exchange send(String endpointUri, Processor processor); Exchange send(Endpoint endpoint, Processor processor); Exchange send( String endpointUri, ExchangePattern pattern, Processor processor ); Exchange send( Endpoint endpoint, ExchangePattern pattern, Processor processor );
发送消息正文
如果您只关注您要发送的消息正文的内容,您可以使用 sendBody ()
方法提供消息正文作为参数,并让制作者模板将正文插入到默认交换对象中。
sendBody ()
方法允许您使用以下方法之一指定目标端点: 作为默认端点,作为端点 URI 或 Endpoint
对象。此外,您还可以通过提供 pattern
参数来指定交换的 MEP,而不接受默认值。没有 模式
参数的方法返回 void
(即使调用在某些情况下可能会给回复),并且具有 pattern
参数的方法会返回 Out 消息的正文(如果有一个)或 In 消息的正文(否则为准)。
void sendBody(Object body); void sendBody(String endpointUri, Object body); void sendBody(Endpoint endpoint, Object body); Object sendBody( String endpointUri, ExchangePattern pattern, Object body ); Object sendBody( Endpoint endpoint, ExchangePattern pattern, Object body );
发送消息正文和标题
出于测试目的,尝试 单个 标头设置的影响和 sendBodyAndHeader ()
方法对这种标头测试很有用。您提供消息正文和标头设置作为 sendBodyAndHeader ()
的参数,并允许制作者模板将正文和标头设置插入到默认交换对象中。
sendBodyAndHeader ()
方法允许您以以下一种方式指定目标端点: 作为默认端点,作为端点 URI 或 Endpoint
对象。此外,您还可以通过提供 pattern
参数来指定交换的 MEP,而不接受默认值。没有 模式
参数的方法返回 void
(即使调用在某些情况下可能会给回复),并且具有 pattern
参数的方法会返回 Out 消息的正文(如果有一个)或 In 消息的正文(否则为准)。
void sendBodyAndHeader( Object body, String header, Object headerValue ); void sendBodyAndHeader( String endpointUri, Object body, String header, Object headerValue ); void sendBodyAndHeader( Endpoint endpoint, Object body, String header, Object headerValue ); Object sendBodyAndHeader( String endpointUri, ExchangePattern pattern, Object body, String header, Object headerValue ); Object sendBodyAndHeader( Endpoint endpoint, ExchangePattern pattern, Object body, String header, Object headerValue );
sendBodyAndHeaders ()
方法与 sendBodyAndHeader ()
方法类似,除了提供单个标头设置外,这些方法允许您指定标头设置的完整散列映射。
void sendBodyAndHeaders( Object body, Map<String, Object> headers ); void sendBodyAndHeaders( String endpointUri, Object body, Map<String, Object> headers ); void sendBodyAndHeaders( Endpoint endpoint, Object body, Map<String, Object> headers ); Object sendBodyAndHeaders( String endpointUri, ExchangePattern pattern, Object body, Map<String, Object> headers ); Object sendBodyAndHeaders( Endpoint endpoint, ExchangePattern pattern, Object body, Map<String, Object> headers );
发送消息正文和交换属性
您可以使用 sendBodyAndProperty ()
方法尝试设置单个交换属性的影响。您提供消息正文和属性设置作为 sendBodyAndProperty ()
的参数,并让制作者模板负责将正文和交换属性插入到默认的交换对象中。
sendBodyAndProperty ()
方法允许您以以下一种方式指定目标端点: 作为默认端点,作为端点 URI 或 Endpoint
对象。此外,您还可以通过提供 pattern
参数来指定交换的 MEP,而不接受默认值。没有 模式
参数的方法返回 void
(即使调用在某些情况下可能会给回复),并且具有 pattern
参数的方法会返回 Out 消息的正文(如果有一个)或 In 消息的正文(否则为准)。
void sendBodyAndProperty( Object body, String property, Object propertyValue ); void sendBodyAndProperty( String endpointUri, Object body, String property, Object propertyValue ); void sendBodyAndProperty( Endpoint endpoint, Object body, String property, Object propertyValue ); Object sendBodyAndProperty( String endpoint, ExchangePattern pattern, Object body, String property, Object propertyValue ); Object sendBodyAndProperty( Endpoint endpoint, ExchangePattern pattern, Object body, String property, Object propertyValue );
37.1.3. 使用 InOut Pattern 同步请求
概述
同步请求 方法与同步发送方法类似,但请求方法强制消息交换模式为 InOut (代表请求/回复语义)。因此,如果您预期从生成者端点收到回复,通常最好使用同步请求方法。
请求由处理器填充的交换
基本 request ()
方法是一种通用方法,它使用处理器填充默认交换,并强制消息交换模式成为 InOut (因此调用遵循请求/回复语义)。返回值是您在生成者端点处理后获取的交换,其中 Out 消息包含回复消息。
发送由处理器填充的交换的 request ()
方法可让您以以下一种方式指定目标端点: 作为端点 URI 或 Endpoint
对象。
Exchange request(String endpointUri, Processor processor); Exchange request(Endpoint endpoint, Processor processor);
请求消息正文
如果您只关注请求中的消息正文内容,并在回复中,您可以使用 requestBody ()
方法提供请求消息正文作为参数,并让制作者模板将正文插入到默认的交换对象中。
requestBody ()
方法允许您使用以下方法之一指定目标端点: 作为默认端点,作为端点 URI 或 Endpoint
对象。返回值是回复消息的正文(Out message body),可以是 plain Object
,也可以使用内置类型转换器(请参阅 第 34.3 节 “built-In Type Converters”)转换为特定类型的 T
。
Object requestBody(Object body); <T> T requestBody(Object body, Class<T> type); Object requestBody( String endpointUri, Object body ); <T> T requestBody( String endpointUri, Object body, Class<T> type ); Object requestBody( Endpoint endpoint, Object body ); <T> T requestBody( Endpoint endpoint, Object body, Class<T> type );
请求消息正文和标头
您可以使用 requestBodyAndHeader ()
方法尝试设置单个标头值的影响。您提供消息正文和标头设置作为 requestBodyAndHeader ()
的参数,并允许制作者模板将正文和交换属性插入到默认交换对象中。
requestBodyAndHeader ()
方法允许您以以下一种方式指定目标端点: 作为端点 URI 或 Endpoint
对象。返回值是回复消息的正文(Out message body),可以是 plain Object
,也可以使用内置类型转换器(请参阅 第 34.3 节 “built-In Type Converters”)转换为特定类型的 T
。
Object requestBodyAndHeader( String endpointUri, Object body, String header, Object headerValue ); <T> T requestBodyAndHeader( String endpointUri, Object body, String header, Object headerValue, Class<T> type ); Object requestBodyAndHeader( Endpoint endpoint, Object body, String header, Object headerValue ); <T> T requestBodyAndHeader( Endpoint endpoint, Object body, String header, Object headerValue, Class<T> type );
requestBodyAndHeaders ()
方法与 requestBodyAndHeader ()
方法类似,除了提供单个标头设置外,这些方法允许您指定标头设置的完整散列映射。
Object requestBodyAndHeaders( String endpointUri, Object body, Map<String, Object> headers ); <T> T requestBodyAndHeaders( String endpointUri, Object body, Map<String, Object> headers, Class<T> type ); Object requestBodyAndHeaders( Endpoint endpoint, Object body, Map<String, Object> headers ); <T> T requestBodyAndHeaders( Endpoint endpoint, Object body, Map<String, Object> headers, Class<T> type );
37.1.4. 异步发送
概述
producer 模板提供各种异步调用制作者端点的方法,以便在等待调用完成时阻止主线程,并且稍后可以检索回复消息。本节中描述的异步发送方法与任何类型的消息交换协议兼容。
发送交换
基本的 asyncSend ()
方法采用 Exchange
参数,并使用指定交换的消息交换模式(MEP)异步调用端点。返回值是一个 java.util.concurrent.Future
对象,它是稍后用来收集回复消息的票据,请参阅 “异步调用”一节。
以下 asyncSend ()
方法可让您使用以下方法之一指定目标端点: 作为端点 URI 或 Endpoint
对象。
Future<Exchange> asyncSend(String endpointUri, Exchange exchange); Future<Exchange> asyncSend(Endpoint endpoint, Exchange exchange);
发送由处理器填充的交换
常规 asyncSend ()
方法的一个简单变体是使用处理器填充默认交换,而不是显式提供交换对象。
以下 asyncSend ()
方法可让您使用以下方法之一指定目标端点: 作为端点 URI 或 Endpoint
对象。
Future<Exchange> asyncSend(String endpointUri, Processor processor); Future<Exchange> asyncSend(Endpoint endpoint, Processor processor);
发送消息正文
如果您只关注您要发送的消息正文的内容,您可以使用 asyncSendBody ()
方法异步发送消息正文,并允许制作者模板将正文插入到默认交换对象中。
asyncSendBody ()
方法允许您以以下一种方式指定目标端点: 作为端点 URI 或 Endpoint
对象。
Future<Object> asyncSendBody(String endpointUri, Object body); Future<Object> asyncSendBody(Endpoint endpoint, Object body);
37.1.5. 使用 InOut Pattern 的异步请求
概述
异步请求 方法与异步发送方法类似,但请求方法强制消息交换模式为 InOut (代表请求/回复语义)。因此,如果您预期从生成者端点收到回复,通常最好使用异步请求方法。
请求消息正文
如果您只关注请求中的消息正文内容,并在回复中,您可以使用 requestBody ()
方法提供请求消息正文作为参数,并让制作者模板将正文插入到默认的交换对象中。
asyncRequestBody ()
方法允许您以以下一种方式指定目标端点: 作为端点 URI 或 Endpoint
对象。从 Future
对象检索的返回值是回复消息的正文(Out message body),它可以作为普通 对象
返回,或使用内置的类型 T
转换程序(请参阅 “异步调用”一节)。
Future<Object> asyncRequestBody( String endpointUri, Object body ); <T> Future<T> asyncRequestBody( String endpointUri, Object body, Class<T> type ); Future<Object> asyncRequestBody( Endpoint endpoint, Object body ); <T> Future<T> asyncRequestBody( Endpoint endpoint, Object body, Class<T> type );
请求消息正文和标头
您可以使用 asyncRequestBodyAndHeader ()
方法尝试设置单个标头值的影响。您提供消息正文和标头设置作为 asyncRequestBodyAndHeader ()
的参数,并让制作者模板负责将正文和交换属性插入到默认交换对象中。
asyncRequestBodyAndHeader ()
方法可让您以以下一种方式指定目标端点: 作为端点 URI 或 Endpoint
对象。从 Future
对象检索的返回值是回复消息的正文(Out message body),它可以作为普通 对象
返回,或使用内置的类型 T
转换程序(请参阅 “异步调用”一节)。
Future<Object> asyncRequestBodyAndHeader( String endpointUri, Object body, String header, Object headerValue ); <T> Future<T> asyncRequestBodyAndHeader( String endpointUri, Object body, String header, Object headerValue, Class<T> type ); Future<Object> asyncRequestBodyAndHeader( Endpoint endpoint, Object body, String header, Object headerValue ); <T> Future<T> asyncRequestBodyAndHeader( Endpoint endpoint, Object body, String header, Object headerValue, Class<T> type );
asyncRequestBodyAndHeaders ()
方法与 asyncRequestBodyAndHeader ()
方法类似,除了提供单个标头设置外,这些方法允许您指定标头设置的完整散列映射。
Future<Object> asyncRequestBodyAndHeaders( String endpointUri, Object body, Map<String, Object> headers ); <T> Future<T> asyncRequestBodyAndHeaders( String endpointUri, Object body, Map<String, Object> headers, Class<T> type ); Future<Object> asyncRequestBodyAndHeaders( Endpoint endpoint, Object body, Map<String, Object> headers ); <T> Future<T> asyncRequestBodyAndHeaders( Endpoint endpoint, Object body, Map<String, Object> headers, Class<T> type );
37.1.6. 使用回调异步发送
概述
producer 模板还提供 选项,用于处理用于调用制作者端点的同一子线程中的回复消息。在这种情况下,您可以提供一个回调对象,它会在收到回复消息后立即在子线程中调用。换句话说,使用回调方法的异步发送 可让您在主线程中启动调用,然后具有生成者端点的所有关联处理因此,等待回复并在子线程中异步处理回复。
发送交换
基本 asyncCallback ()
方法采用 Exchange
参数,并使用指定交换的消息交换模式(MEP)异步调用端点。这个方法类似于交换的 asyncSend ()
方法,但它采用额外的 org.apache.camel.spi.Synchronization
参数,它是一个具有两种方法: onComplete ()
和 onFailure ()
的回调接口。有关如何使用 同步
回调的详情,请参考 “使用回调进行异步调用”一节。
以下 asyncCallback ()
方法可让您以以下一种方式指定目标端点: 作为端点 URI 或 Endpoint
对象。
Future<Exchange> asyncCallback( String endpointUri, Exchange exchange, Synchronization onCompletion ); Future<Exchange> asyncCallback( Endpoint endpoint, Exchange exchange, Synchronization onCompletion );
发送由处理器填充的交换
处理器的 asyncCallback ()
方法调用处理器来填充默认交换,并强制消息交换模式成为 InOut (因此调用模糊请求/回复语义)。
以下 asyncCallback ()
方法可让您以以下一种方式指定目标端点: 作为端点 URI 或 Endpoint
对象。
Future<Exchange> asyncCallback( String endpointUri, Processor processor, Synchronization onCompletion ); Future<Exchange> asyncCallback( Endpoint endpoint, Processor processor, Synchronization onCompletion );
发送消息正文
如果您只关注您要发送的消息正文的内容,您可以使用 asyncCallbackSendBody ()
方法异步发送消息正文,并允许制作者模板将正文插入到默认交换对象中。
asyncCallbackSendBody ()
方法允许您以以下一种方式指定目标端点: 作为端点 URI 或 Endpoint
对象。
Future<Object> asyncCallbackSendBody( String endpointUri, Object body, Synchronization onCompletion ); Future<Object> asyncCallbackSendBody( Endpoint endpoint, Object body, Synchronization onCompletion );
请求消息正文
如果您只关注请求中的消息正文内容,并在回复中,您可以使用 asyncCallbackRequestBody ()
方法提供请求消息正文作为参数,并使制作者模板负责将正文插入到默认的交换对象中。
asyncCallbackRequestBody ()
方法允许您以以下一种方式指定目标端点: 作为端点 URI 或 Endpoint
对象。
Future<Object> asyncCallbackRequestBody( String endpointUri, Object body, Synchronization onCompletion ); Future<Object> asyncCallbackRequestBody( Endpoint endpoint, Object body, Synchronization onCompletion );