第 37 章 生产者和消费者模板


摘要

Apache Camel 中的生成者和消费者模板在 Spring 容器 API 的功能后建模,通过一个简单易用的 API (称为 模板 )来提供资源的访问。如果是 Apache Camel,生产者模板和消费者模板提供了简化的接口,用于向和接收来自生成者端点和消费者端点的消息。

37.1. 使用 Producer 模板

37.1.1. Producer 模板简介

概述

producer 模板支持各种不同的方法来调用制作者端点。有方法支持请求消息的不同格式(作为 Exchange 对象,作为消息正文,作为带单个标头设置的消息正文,等等),有支持同步和异步调用方式的方法。总体而言,生成者模板方法可分为以下类别:

或者,请参阅 第 37.2 节 “使用 Fluent Producer 模板”

同步调用

同步调用端点的方法具有 发送后缀()和 请求 后缀() 的形式的名称。例如,使用默认消息交换模式(MEP)或明确指定的 MEP 调用端点的方法被命名为 send (), sendBody (), 和 sendBodyAndHeader () (这些方法分别发送 Exchange 对象、消息正文或消息正文和标头值)。如果要强制 MEP 为 InOut (请求/回复语义),您可以改为调用 request ()requestBody ()requestBodyAndHeader () 方法。

以下示例演示了如何创建 ProducerTemplate 实例,并使用它来发送消息正文到 activemq:MyQueue 端点。示例还演示了如何使用 sendBodyAndHeader () 发送消息正文和标头值。

import org.apache.camel.ProducerTemplate
import org.apache.camel.impl.DefaultProducerTemplate
...
ProducerTemplate template = context.createProducerTemplate();

// Send to a specific queue
template.sendBody("activemq:MyQueue", "<hello>world!</hello>");

// Send with a body and header
template.sendBodyAndHeader(
    "activemq:MyQueue",
    "<hello>world!</hello>",
    "CustomerRating", "Gold" );

使用处理器进行同步调用

同步调用的一个特殊情况是您为 send () 方法提供 Processor 参数而不是 Exchange 参数。在这种情况下,生成者模板会隐式要求指定的端点创建 Exchange 实例(通常并非始终默认具有 InOnly MEP)。然后,此默认交换传递到处理器,它将初始化交换对象的内容。

以下示例演示了如何将 MyProcessor 处理器初始化的交换发送到 activemq:MyQueue 端点。

import org.apache.camel.ProducerTemplate
import org.apache.camel.impl.DefaultProducerTemplate
...
ProducerTemplate template = context.createProducerTemplate();

// Send to a specific queue, using a processor to initialize
template.send("activemq:MyQueue", new MyProcessor());

MyProcessor 类按照以下示例所示实施。除了设置 In message body (如此处所示),您还可以初始化消息标题和交换属性。

import org.apache.camel.Processor;
import org.apache.camel.Exchange;
...
public class MyProcessor implements Processor {
    public MyProcessor() { }

    public void process(Exchange ex) {
        ex.getIn().setBody("<hello>world!</hello>");
    }
}

异步调用

异步 调用端点的方法具有 asyncSendSuffix()和 asyncRequestSuffix() 形式的名称。例如,使用默认消息交换模式(MEP)或明确指定的 MEP 调用端点的方法被命名为 asyncSend ()asyncSendBody () (其中这些方法分别发送 Exchange 对象或消息正文)。如果要强制 MEP 为 InOut (request/reply 语义),您可以调用 asyncRequestBody ()、syncRequestBodyAndHeader ()、syncRequestBodyAndHeader ()asyncRequestBodyAndHeaders () 方法。

以下示例演示了如何异步向 direct:start 端点发送交换。asyncSend () 方法返回一个 java.util.concurrent.Future 对象,用于稍后检索调用结果。

import java.util.concurrent.Future;

import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultExchange;
...
Exchange exchange = new DefaultExchange(context);
exchange.getIn().setBody("Hello");

Future<Exchange> future = template.asyncSend("direct:start", exchange);

// You can do other things, whilst waiting for the invocation to complete
...
// Now, retrieve the resulting exchange from the Future
Exchange result = future.get();

producer 模板还提供异步发送消息正文的方法(例如,使用 asyncSendBody ()asyncRequestBody ())。在这种情况下,您可以使用以下帮助程序方法之一从 Future 对象中提取返回的消息正文:

<T> T extractFutureBody(Future future, Class<T> type);
<T> T extractFutureBody(Future future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException;

extractFutureBody () 方法块的第一个版本,直到调用完成并且回复消息可用为止。extractFutureBody () 方法的第二个版本允许您指定超时。两种方法都具有类型参数,键入,它将返回的消息正文转换为指定类型(使用内置类型转换器)。

以下示例演示了如何使用 asyncRequestBody () 方法将消息正文发送到 direct:start 端点。然后,使用 blocking extractFutureBody () 方法从 Future 对象检索回复消息正文。

Future<Object> future = template.asyncRequestBody("direct:start", "Hello");

// You can do other things, whilst waiting for the invocation to complete
...
// Now, retrieve the reply message body as a String type
String result = template.extractFutureBody(future, String.class);

使用回调进行异步调用

在前面的异步示例中,请求消息在子线程中分配,而回复则由主线程检索和处理。producer 模板还为您提供了选项,但子线程中的处理回复使用 asyncCallback (), , asyncCallbackSendBody () 方法之一,或 asyncCallbackRequestBody () 方法。在这种情况下,您可以提供一个回调对象( org.apache.camel.impl.SynchronizationAdapter 类型),它会在回复消息到达时在子线程中自动调用。

同步 回调接口定义如下:

package org.apache.camel.spi;

import org.apache.camel.Exchange;

public interface Synchronization {
    void onComplete(Exchange exchange);
    void onFailure(Exchange exchange);
}

如果在收到正常回复时调用 onComplete () 方法,并且在收到错误消息回复时调用 onFailure () 方法。只有其中一种方法会调用回来,因此您必须覆盖这两个方法,以确保处理所有类型的回复。

以下示例演示了如何将交换发送到 direct:start 端点,其中 SynchronizationAdapter 回调对象在子线程中处理回复消息。

import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.impl.SynchronizationAdapter;
...
Exchange exchange = context.getEndpoint("direct:start").createExchange();
exchange.getIn().setBody("Hello");

Future<Exchange> future = template.asyncCallback("direct:start", exchange, new SynchronizationAdapter() {
    @Override
    public void onComplete(Exchange exchange) {
        assertEquals("Hello World", exchange.getIn().getBody());
    }
});

其中 Synchronization Adapter 类是同步接口的默认实现,您可以覆盖它来提供您自己对 onComplete ()onFailure () 回调方法的定义。

您仍然有从主线程访问回复的选项,因为 asyncCallback () 方法也会返回 Future object swig-wagon 例如:

// Retrieve the reply from the main thread, specifying a timeout
Exchange reply = future.get(10, TimeUnit.SECONDS);

37.1.2. 同步发送

概述

同步发送 方法是一组可用于调用制作者端点的方法集合,其中当前线程块直到方法调用完成并且收到回复(若有)。这些方法与任何类型的消息交换协议兼容。

发送交换

基本的 send () 方法是一种通用方法,利用交换的消息交换模式(MEP)将 Exchange 对象的内容发送到端点。返回值是您在生成者端点处理后获取的交换(可能包含 Out 消息,具体取决于 MEP)。

发送交换有三种 send () 方法,允许您以以下一种方式指定目标端点: 作为默认端点、端点、端点 URI 或 Endpoint 对象。

Exchange send(Exchange exchange);
Exchange send(String endpointUri, Exchange exchange);
Exchange send(Endpoint endpoint, Exchange exchange);

发送由处理器填充的交换

常规 send () 方法的一个简单变体是使用处理器填充默认交换,而不是显式提供交换对象(详情请参阅 “使用处理器进行同步调用”一节 )。

发送由处理器填充的交换的 send () 方法可让您以以下一种方式指定目标端点: 作为默认端点、端点 URI 或 Endpoint 对象。此外,您还可以通过提供 pattern 参数来指定交换的 MEP,而不接受默认值。

Exchange send(Processor processor);
Exchange send(String endpointUri, Processor processor);
Exchange send(Endpoint endpoint, Processor processor);
Exchange send(
    String endpointUri,
    ExchangePattern pattern,
    Processor processor
);
Exchange send(
    Endpoint endpoint,
    ExchangePattern pattern,
    Processor processor
);

发送消息正文

如果您只关注您要发送的消息正文的内容,您可以使用 sendBody () 方法提供消息正文作为参数,并让制作者模板将正文插入到默认交换对象中。

sendBody () 方法允许您使用以下方法之一指定目标端点: 作为默认端点,作为端点 URI 或 Endpoint 对象。此外,您还可以通过提供 pattern 参数来指定交换的 MEP,而不接受默认值。没有 模式 参数的方法返回 void (即使调用在某些情况下可能会给回复),并且具有 pattern 参数的方法会返回 Out 消息的正文(如果有一个)或 In 消息的正文(否则为准)。

void sendBody(Object body);
void sendBody(String endpointUri, Object body);
void sendBody(Endpoint endpoint, Object body);
Object sendBody(
    String endpointUri,
    ExchangePattern pattern,
    Object body
);
Object sendBody(
    Endpoint endpoint,
    ExchangePattern pattern,
    Object body
);

发送消息正文和标题

出于测试目的,尝试 单个 标头设置的影响和 sendBodyAndHeader () 方法对这种标头测试很有用。您提供消息正文和标头设置作为 sendBodyAndHeader () 的参数,并允许制作者模板将正文和标头设置插入到默认交换对象中。

sendBodyAndHeader () 方法允许您以以下一种方式指定目标端点: 作为默认端点,作为端点 URI 或 Endpoint 对象。此外,您还可以通过提供 pattern 参数来指定交换的 MEP,而不接受默认值。没有 模式 参数的方法返回 void (即使调用在某些情况下可能会给回复),并且具有 pattern 参数的方法会返回 Out 消息的正文(如果有一个)或 In 消息的正文(否则为准)。

void sendBodyAndHeader(
    Object body,
    String header,
    Object headerValue
);
void sendBodyAndHeader(
    String endpointUri,
    Object body,
    String header,
    Object headerValue
);
void sendBodyAndHeader(
    Endpoint endpoint,
    Object body,
    String header,
    Object headerValue
);
Object sendBodyAndHeader(
    String endpointUri,
    ExchangePattern pattern,
    Object body,
    String header,
    Object headerValue
);
Object sendBodyAndHeader(
    Endpoint endpoint,
    ExchangePattern pattern,
    Object body,
    String header,
    Object headerValue
);

sendBodyAndHeaders () 方法与 sendBodyAndHeader () 方法类似,除了提供单个标头设置外,这些方法允许您指定标头设置的完整散列映射。

void sendBodyAndHeaders(
    Object body,
    Map<String, Object> headers
);
void sendBodyAndHeaders(
    String endpointUri,
    Object body,
    Map<String, Object> headers
);
void sendBodyAndHeaders(
    Endpoint endpoint,
    Object body,
    Map<String, Object> headers
);
Object sendBodyAndHeaders(
    String endpointUri,
    ExchangePattern pattern,
    Object body,
    Map<String, Object> headers
);
Object sendBodyAndHeaders(
    Endpoint endpoint,
    ExchangePattern pattern,
    Object body,
    Map<String, Object> headers
);

发送消息正文和交换属性

您可以使用 sendBodyAndProperty () 方法尝试设置单个交换属性的影响。您提供消息正文和属性设置作为 sendBodyAndProperty () 的参数,并让制作者模板负责将正文和交换属性插入到默认的交换对象中。

sendBodyAndProperty () 方法允许您以以下一种方式指定目标端点: 作为默认端点,作为端点 URI 或 Endpoint 对象。此外,您还可以通过提供 pattern 参数来指定交换的 MEP,而不接受默认值。没有 模式 参数的方法返回 void (即使调用在某些情况下可能会给回复),并且具有 pattern 参数的方法会返回 Out 消息的正文(如果有一个)或 In 消息的正文(否则为准)。

void sendBodyAndProperty(
    Object body,
    String property,
    Object propertyValue
);
void sendBodyAndProperty(
    String endpointUri,
    Object body,
    String property,
    Object propertyValue
);
void sendBodyAndProperty(
    Endpoint endpoint,
    Object body,
    String property,
    Object propertyValue
);
Object sendBodyAndProperty(
    String endpoint,
    ExchangePattern pattern,
    Object body,
    String property,
    Object propertyValue
);
Object sendBodyAndProperty(
    Endpoint endpoint,
    ExchangePattern pattern,
    Object body,
    String property,
    Object propertyValue
);

37.1.3. 使用 InOut Pattern 同步请求

概述

同步请求 方法与同步发送方法类似,但请求方法强制消息交换模式为 InOut (代表请求/回复语义)。因此,如果您预期从生成者端点收到回复,通常最好使用同步请求方法。

请求由处理器填充的交换

基本 request () 方法是一种通用方法,它使用处理器填充默认交换,并强制消息交换模式成为 InOut (因此调用遵循请求/回复语义)。返回值是您在生成者端点处理后获取的交换,其中 Out 消息包含回复消息。

发送由处理器填充的交换的 request () 方法可让您以以下一种方式指定目标端点: 作为端点 URI 或 Endpoint 对象。

Exchange request(String endpointUri, Processor processor);
Exchange request(Endpoint endpoint, Processor processor);

请求消息正文

如果您只关注请求中的消息正文内容,并在回复中,您可以使用 requestBody () 方法提供请求消息正文作为参数,并让制作者模板将正文插入到默认的交换对象中。

requestBody () 方法允许您使用以下方法之一指定目标端点: 作为默认端点,作为端点 URI 或 Endpoint 对象。返回值是回复消息的正文(Out message body),可以是 plain Object,也可以使用内置类型转换器(请参阅 第 34.3 节 “built-In Type Converters”)转换为特定类型的 T

Object requestBody(Object body);
<T> T requestBody(Object body, Class<T> type);
Object requestBody(
    String endpointUri,
    Object body
);
<T> T requestBody(
    String endpointUri,
    Object body,
    Class<T> type
);
Object requestBody(
    Endpoint endpoint,
    Object body
);
<T> T requestBody(
    Endpoint endpoint,
    Object body,
    Class<T> type
);

请求消息正文和标头

您可以使用 requestBodyAndHeader () 方法尝试设置单个标头值的影响。您提供消息正文和标头设置作为 requestBodyAndHeader () 的参数,并允许制作者模板将正文和交换属性插入到默认交换对象中。

requestBodyAndHeader () 方法允许您以以下一种方式指定目标端点: 作为端点 URI 或 Endpoint 对象。返回值是回复消息的正文(Out message body),可以是 plain Object,也可以使用内置类型转换器(请参阅 第 34.3 节 “built-In Type Converters”)转换为特定类型的 T

Object requestBodyAndHeader(
    String endpointUri,
    Object body,
    String header,
    Object headerValue
);
<T> T requestBodyAndHeader(
    String endpointUri,
    Object body,
    String header,
    Object headerValue,
    Class<T> type
);
Object requestBodyAndHeader(
    Endpoint endpoint,
    Object body,
    String header,
    Object headerValue
);
<T> T requestBodyAndHeader(
    Endpoint endpoint,
    Object body,
    String header,
    Object headerValue,
    Class<T> type
);

requestBodyAndHeaders () 方法与 requestBodyAndHeader () 方法类似,除了提供单个标头设置外,这些方法允许您指定标头设置的完整散列映射。

Object requestBodyAndHeaders(
    String endpointUri,
    Object body,
    Map<String, Object> headers
);
<T> T requestBodyAndHeaders(
    String endpointUri,
    Object body,
    Map<String, Object> headers,
    Class<T> type
);
Object requestBodyAndHeaders(
    Endpoint endpoint,
    Object body,
    Map<String, Object> headers
);
<T> T requestBodyAndHeaders(
    Endpoint endpoint,
    Object body,
    Map<String, Object> headers,
    Class<T> type
);

37.1.4. 异步发送

概述

producer 模板提供各种异步调用制作者端点的方法,以便在等待调用完成时阻止主线程,并且稍后可以检索回复消息。本节中描述的异步发送方法与任何类型的消息交换协议兼容。

发送交换

基本的 asyncSend () 方法采用 Exchange 参数,并使用指定交换的消息交换模式(MEP)异步调用端点。返回值是一个 java.util.concurrent.Future 对象,它是稍后用来收集回复消息的票据,请参阅 “异步调用”一节

以下 asyncSend () 方法可让您使用以下方法之一指定目标端点: 作为端点 URI 或 Endpoint 对象。

Future<Exchange> asyncSend(String endpointUri, Exchange exchange);
Future<Exchange> asyncSend(Endpoint endpoint, Exchange exchange);

发送由处理器填充的交换

常规 asyncSend () 方法的一个简单变体是使用处理器填充默认交换,而不是显式提供交换对象。

以下 asyncSend () 方法可让您使用以下方法之一指定目标端点: 作为端点 URI 或 Endpoint 对象。

Future<Exchange> asyncSend(String endpointUri, Processor processor);
Future<Exchange> asyncSend(Endpoint endpoint, Processor processor);

发送消息正文

如果您只关注您要发送的消息正文的内容,您可以使用 asyncSendBody () 方法异步发送消息正文,并允许制作者模板将正文插入到默认交换对象中。

asyncSendBody () 方法允许您以以下一种方式指定目标端点: 作为端点 URI 或 Endpoint 对象。

Future<Object> asyncSendBody(String endpointUri, Object body);
Future<Object> asyncSendBody(Endpoint endpoint, Object body);

37.1.5. 使用 InOut Pattern 的异步请求

概述

异步请求 方法与异步发送方法类似,但请求方法强制消息交换模式为 InOut (代表请求/回复语义)。因此,如果您预期从生成者端点收到回复,通常最好使用异步请求方法。

请求消息正文

如果您只关注请求中的消息正文内容,并在回复中,您可以使用 requestBody () 方法提供请求消息正文作为参数,并让制作者模板将正文插入到默认的交换对象中。

asyncRequestBody () 方法允许您以以下一种方式指定目标端点: 作为端点 URI 或 Endpoint 对象。从 Future 对象检索的返回值是回复消息的正文(Out message body),它可以作为普通 对象 返回,或使用内置的类型 T 转换程序(请参阅 “异步调用”一节)。

Future<Object> asyncRequestBody(
    String endpointUri,
    Object body
);
<T> Future<T> asyncRequestBody(
    String endpointUri,
    Object body,
    Class<T> type
);
Future<Object> asyncRequestBody(
    Endpoint endpoint,
    Object body
);
<T> Future<T> asyncRequestBody(
    Endpoint endpoint,
    Object body,
    Class<T> type
);

请求消息正文和标头

您可以使用 asyncRequestBodyAndHeader () 方法尝试设置单个标头值的影响。您提供消息正文和标头设置作为 asyncRequestBodyAndHeader () 的参数,并让制作者模板负责将正文和交换属性插入到默认交换对象中。

asyncRequestBodyAndHeader () 方法可让您以以下一种方式指定目标端点: 作为端点 URI 或 Endpoint 对象。从 Future 对象检索的返回值是回复消息的正文(Out message body),它可以作为普通 对象 返回,或使用内置的类型 T 转换程序(请参阅 “异步调用”一节)。

Future<Object> asyncRequestBodyAndHeader(
    String endpointUri,
    Object body,
    String header,
    Object headerValue
);
<T> Future<T> asyncRequestBodyAndHeader(
    String endpointUri,
    Object body,
    String header,
    Object headerValue,
    Class<T> type
);
Future<Object> asyncRequestBodyAndHeader(
    Endpoint endpoint,
    Object body,
    String header,
    Object headerValue
);
<T> Future<T> asyncRequestBodyAndHeader(
    Endpoint endpoint,
    Object body,
    String header,
    Object headerValue,
    Class<T> type
);

asyncRequestBodyAndHeaders () 方法与 asyncRequestBodyAndHeader () 方法类似,除了提供单个标头设置外,这些方法允许您指定标头设置的完整散列映射。

Future<Object> asyncRequestBodyAndHeaders(
    String endpointUri,
    Object body,
    Map<String, Object> headers
);
<T> Future<T> asyncRequestBodyAndHeaders(
    String endpointUri,
    Object body,
    Map<String, Object> headers,
    Class<T> type
);
Future<Object> asyncRequestBodyAndHeaders(
    Endpoint endpoint,
    Object body,
    Map<String, Object> headers
);
<T> Future<T> asyncRequestBodyAndHeaders(
    Endpoint endpoint,
    Object body,
    Map<String, Object> headers,
    Class<T> type
);

37.1.6. 使用回调异步发送

概述

producer 模板还提供 选项,用于处理用于调用制作者端点的同一子线程中的回复消息。在这种情况下,您可以提供一个回调对象,它会在收到回复消息后立即在子线程中调用。换句话说,使用回调方法的异步发送 可让您在主线程中启动调用,然后具有生成者端点的所有关联处理因此,等待回复并在子线程中异步处理回复。

发送交换

基本 asyncCallback () 方法采用 Exchange 参数,并使用指定交换的消息交换模式(MEP)异步调用端点。这个方法类似于交换的 asyncSend () 方法,但它采用额外的 org.apache.camel.spi.Synchronization 参数,它是一个具有两种方法: onComplete ()onFailure () 的回调接口。有关如何使用 同步 回调的详情,请参考 “使用回调进行异步调用”一节

以下 asyncCallback () 方法可让您以以下一种方式指定目标端点: 作为端点 URI 或 Endpoint 对象。

Future<Exchange> asyncCallback(
    String endpointUri,
    Exchange exchange,
    Synchronization onCompletion
);
Future<Exchange> asyncCallback(
    Endpoint endpoint,
    Exchange exchange,
    Synchronization onCompletion
);

发送由处理器填充的交换

处理器的 asyncCallback () 方法调用处理器来填充默认交换,并强制消息交换模式成为 InOut (因此调用模糊请求/回复语义)。

以下 asyncCallback () 方法可让您以以下一种方式指定目标端点: 作为端点 URI 或 Endpoint 对象。

Future<Exchange> asyncCallback(
    String endpointUri,
    Processor processor,
    Synchronization onCompletion
);
Future<Exchange> asyncCallback(
    Endpoint endpoint,
    Processor processor,
    Synchronization onCompletion
);

发送消息正文

如果您只关注您要发送的消息正文的内容,您可以使用 asyncCallbackSendBody () 方法异步发送消息正文,并允许制作者模板将正文插入到默认交换对象中。

asyncCallbackSendBody () 方法允许您以以下一种方式指定目标端点: 作为端点 URI 或 Endpoint 对象。

Future<Object> asyncCallbackSendBody(
    String endpointUri,
    Object body,
    Synchronization onCompletion
);
Future<Object> asyncCallbackSendBody(
    Endpoint endpoint,
    Object body,
    Synchronization onCompletion
);

请求消息正文

如果您只关注请求中的消息正文内容,并在回复中,您可以使用 asyncCallbackRequestBody () 方法提供请求消息正文作为参数,并使制作者模板负责将正文插入到默认的交换对象中。

asyncCallbackRequestBody () 方法允许您以以下一种方式指定目标端点: 作为端点 URI 或 Endpoint 对象。

Future<Object> asyncCallbackRequestBody(
    String endpointUri,
    Object body,
    Synchronization onCompletion
);
Future<Object> asyncCallbackRequestBody(
    Endpoint endpoint,
    Object body,
    Synchronization onCompletion
);
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.