第 37 章 producer 和 Consumer Templates


摘要

Apache Camel 中的生产者和消费者模板在 Spring 容器 API 的功能后进行建模,通过一个简化的易用 API (称为 模板 )提供对资源的访问。对于 Apache Camel,生成者模板和消费者模板提供了简化的接口,用于将消息发送到生产者端点和消费者端点,并接收消息。

37.1. 使用 Producer 模板

37.1.1. Producer 模板简介

概述

producer 模板支持各种不同的方法来调用制作者端点。有方法支持请求消息的不同格式(作为消息正文,作为消息正文,使用单个标头设置等),并有方法支持同步和异步调用方式。总体而言,生成者模板方法可以分组到以下类别中:

或者,请参阅 第 37.2 节 “使用 Fluent Producer 模板”

同步调用

同步调用端点的方法具有 发送后缀()和 请求 后缀() 的形式的名称。例如,使用默认消息交换模式(MEP)或明确指定的 MEP 调用端点的方法被命名为 send ()sendBody ()sendBodyAndHeader () (这些方法分别发送 Exchange 对象、消息正文或消息正文和标头值)。如果要强制 MEP 为 InOut (请求/回复语义),您可以改为调用 request ()requestBody ()requestBodyAndHeader () 方法。

以下示例演示了如何创建 ProducerTemplate 实例,并使用它来向 activemq:MyQueue 端点发送消息正文。该示例还演示了如何使用 sendBodyAndHeader () 发送邮件正文和标头值。

import org.apache.camel.ProducerTemplate
import org.apache.camel.impl.DefaultProducerTemplate
...
ProducerTemplate template = context.createProducerTemplate();

// Send to a specific queue
template.sendBody("activemq:MyQueue", "<hello>world!</hello>");

// Send with a body and header
template.sendBodyAndHeader(
    "activemq:MyQueue",
    "<hello>world!</hello>",
    "CustomerRating", "Gold" );

使用处理器进行同步调用

特殊的同步调用情形是您通过 Processor 参数而不是 Exchange 参数提供 send () 方法。在这种情况下,生成者模板会隐式要求指定的端点创建 Exchange 实例(通常为,但并不总是默认具有 InOnly MEP)。然后,此默认交换会被传递给处理器,它会初始化交换对象的内容。

以下示例演示了如何将 MyProcessor 处理器初始化的交换发送到 activemq:MyQueue 端点。

import org.apache.camel.ProducerTemplate
import org.apache.camel.impl.DefaultProducerTemplate
...
ProducerTemplate template = context.createProducerTemplate();

// Send to a specific queue, using a processor to initialize
template.send("activemq:MyQueue", new MyProcessor());

MyProcessor 类已实施,如下例所示。除了设置 In 消息正文(如此处所示)外,您还可以初始化消息标头和交换属性。

import org.apache.camel.Processor;
import org.apache.camel.Exchange;
...
public class MyProcessor implements Processor {
    public MyProcessor() { }

    public void process(Exchange ex) {
        ex.getIn().setBody("<hello>world!</hello>");
    }
}

异步调用

异步 调用端点的方法具有 asyncSendSuffix()asyncRequestSuffix() 的名称。例如,使用默认消息交换模式(MEP)或明确指定的 MEP 调用端点的方法被命名为 asyncSend ()asyncSendBody () (这些方法分别发送 Exchange 对象或消息正文)。如果要强制 MEP 为 InOut (请求/回复语义),您可以调用 asyncRequestBody (), asyncRequestBodyAndHeader (),asyncRequestBodyAndHeaders () 方法。

以下示例演示了如何将交换异步发送到 direct:start 端点。asyncSend () 方法返回 java.util.concurrent.Future 对象,用于在以后检索调用结果。

import java.util.concurrent.Future;

import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultExchange;
...
Exchange exchange = new DefaultExchange(context);
exchange.getIn().setBody("Hello");

Future<Exchange> future = template.asyncSend("direct:start", exchange);

// You can do other things, whilst waiting for the invocation to complete
...
// Now, retrieve the resulting exchange from the Future
Exchange result = future.get();

producer 模板也提供异步发送消息正文的方法(例如,使用 asyncSendBody ()asyncRequestBody ())。在这种情况下,您可以使用以下帮助程序方法之一从 future 对象中提取返回的消息正文:

<T> T extractFutureBody(Future future, Class<T> type);
<T> T extractFutureBody(Future future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException;

extractFutureBody () 方法的第一个版本会阻止,直到调用完成并且回复消息可用。extractFutureBody () 方法的第二个版本允许您指定超时。两种方法都有一个 type 参数,键入,它使用内置类型转换器将返回的消息正文发送到指定类型。

以下示例演示了如何使用 asyncRequestBody () 方法将消息正文发送到 direct:start 端点。然后,使用 blocking extractFutureBody () 方法从 future 对象检索回复消息正文。

Future<Object> future = template.asyncRequestBody("direct:start", "Hello");

// You can do other things, whilst waiting for the invocation to complete
...
// Now, retrieve the reply message body as a String type
String result = template.extractFutureBody(future, String.class);

使用回调进行异步调用

在前面的异步示例中,请求消息在子线程中分配,而回复由主线程检索和处理。producer 模板还为您提供了选项,但处理子线程中的回复,使用 asyncCallbackSendBody () asyncCallbackRequestBody () 方法之一。在这种情况下,您可以提供一个回调对象( org.apache.camel.impl.SynchronizationAdapter 类型),它会在回复消息到达后立即在子线程中自动调用。

同步 回调接口定义如下:

package org.apache.camel.spi;

import org.apache.camel.Exchange;

public interface Synchronization {
    void onComplete(Exchange exchange);
    void onFailure(Exchange exchange);
}

在收到正常回复时调用 onComplete () 方法,并在收到错误消息回复时调用 onFailure () 方法。只有其中一个方法被调用回来,因此您必须覆盖这两个方法以确保处理所有类型的回复。

以下示例演示了如何将交换发送到 direct:start 端点,其中回复消息由 SynchronizationAdapter 回调对象在子线程中处理。

import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.impl.SynchronizationAdapter;
...
Exchange exchange = context.getEndpoint("direct:start").createExchange();
exchange.getIn().setBody("Hello");

Future<Exchange> future = template.asyncCallback("direct:start", exchange, new SynchronizationAdapter() {
    @Override
    public void onComplete(Exchange exchange) {
        assertEquals("Hello World", exchange.getIn().getBody());
    }
});

其中 SynchronizationAdapter 类是 同步 接口的默认实现,您可以覆盖它来提供自己的 onComplete ()onFailure () 回调方法的定义。

您仍然可以选择从主线程访问回复,因为 asyncCallback () 方法也会返回 future 对象 iwl-MAPPING 例如:

// Retrieve the reply from the main thread, specifying a timeout
Exchange reply = future.get(10, TimeUnit.SECONDS);

37.1.2. 同步发送

概述

同步发送 方法是可用于调用制作者端点的方法集合,其中当前线程块直到方法调用完成,并且收到回复(若有)。这些方法与任何类型的消息交换协议兼容。

发送交换

基本的 send () 方法是一种通用方法,利用交换的消息交换模式(MEP)将 Exchange 对象的内容发送到端点。返回值是您通过制作者端点处理后获得的交换(可能包含 Out 消息,具体取决于 MEP)。

有三种 send () 方法来发送交换,允许您用以下一种方式指定目标端点: 作为默认端点,作为端点 URI 或 Endpoint 对象。

Exchange send(Exchange exchange);
Exchange send(String endpointUri, Exchange exchange);
Exchange send(Endpoint endpoint, Exchange exchange);

发送由处理器填充的交换

常规 send () 方法的一个简单变体是使用处理器填充默认交换,而不是显式提供交换对象(详情请参阅 “使用处理器进行同步调用”一节 )。

发送由处理器填充的交换的 send () 方法允许您以以下一种方式指定目标端点: 作为默认端点、作为端点 URI 或 Endpoint 对象。此外,您还可以选择通过提供 pattern 参数来指定交换的 MEP,而不必接受默认值。

Exchange send(Processor processor);
Exchange send(String endpointUri, Processor processor);
Exchange send(Endpoint endpoint, Processor processor);
Exchange send(
    String endpointUri,
    ExchangePattern pattern,
    Processor processor
);
Exchange send(
    Endpoint endpoint,
    ExchangePattern pattern,
    Processor processor
);

发送邮件正文

如果您只关注要发送的消息正文的内容,您可以使用 sendBody () 方法将消息正文作为参数提供,并让制作者模板负责将正文插入到默认的交换对象中。

sendBody () 方法允许您用以下任一方式指定目标端点: 作为默认端点,作为端点 URI 或 Endpoint 对象。此外,您还可以选择通过提供 pattern 参数来指定交换的 MEP,而不必接受默认值。没有 模式 参数的方法返回 void (即使调用在某些情况下可能会引发回复),而带有 模式 参数的方法返回 Out 消息的正文(如果存在)或 In 消息正文(另一智能)。

void sendBody(Object body);
void sendBody(String endpointUri, Object body);
void sendBody(Endpoint endpoint, Object body);
Object sendBody(
    String endpointUri,
    ExchangePattern pattern,
    Object body
);
Object sendBody(
    Endpoint endpoint,
    ExchangePattern pattern,
    Object body
);

发送邮件正文和标头

出于测试目的,尝试 单个 标头设置和 sendBodyAndHeader () 方法对这类标头测试很有用。您可以将消息正文和标头设置作为参数提供给 sendBodyAndHeader (),并让制作者模板负责将 body 和 header 设置插入到默认的交换对象中。

sendBodyAndHeader () 方法允许您以以下一种方式指定目标端点: 作为默认端点,作为端点 URI 或 Endpoint 对象。此外,您还可以选择通过提供 pattern 参数来指定交换的 MEP,而不必接受默认值。没有 模式 参数的方法返回 void (即使调用在某些情况下可能会引发回复),而带有 模式 参数的方法返回 Out 消息的正文(如果存在)或 In 消息正文(另一智能)。

void sendBodyAndHeader(
    Object body,
    String header,
    Object headerValue
);
void sendBodyAndHeader(
    String endpointUri,
    Object body,
    String header,
    Object headerValue
);
void sendBodyAndHeader(
    Endpoint endpoint,
    Object body,
    String header,
    Object headerValue
);
Object sendBodyAndHeader(
    String endpointUri,
    ExchangePattern pattern,
    Object body,
    String header,
    Object headerValue
);
Object sendBodyAndHeader(
    Endpoint endpoint,
    ExchangePattern pattern,
    Object body,
    String header,
    Object headerValue
);

sendBodyAndHeaders () 方法与 sendBodyAndHeader () 方法类似,除了只提供单个标头设置外,这些方法允许您指定完整的标头设置映射。

void sendBodyAndHeaders(
    Object body,
    Map<String, Object> headers
);
void sendBodyAndHeaders(
    String endpointUri,
    Object body,
    Map<String, Object> headers
);
void sendBodyAndHeaders(
    Endpoint endpoint,
    Object body,
    Map<String, Object> headers
);
Object sendBodyAndHeaders(
    String endpointUri,
    ExchangePattern pattern,
    Object body,
    Map<String, Object> headers
);
Object sendBodyAndHeaders(
    Endpoint endpoint,
    ExchangePattern pattern,
    Object body,
    Map<String, Object> headers
);

发送消息 body 和 exchange 属性

您可以使用 sendBodyAndProperty () 方法尝试设置单个 Exchange 属性的影响。您可以将消息 body 和 property 设置作为参数提供给 sendBodyAndProperty (),并让制作者模板负责将 body 和 exchange 属性插入到默认的交换对象中。

sendBodyAndProperty () 方法允许您以以下一种方式指定目标端点: 作为默认端点,作为端点 URI 或 Endpoint 对象。此外,您还可以选择通过提供 pattern 参数来指定交换的 MEP,而不必接受默认值。没有 模式 参数的方法返回 void (即使调用在某些情况下可能会引发回复),而带有 模式 参数的方法返回 Out 消息的正文(如果存在)或 In 消息正文(另一智能)。

void sendBodyAndProperty(
    Object body,
    String property,
    Object propertyValue
);
void sendBodyAndProperty(
    String endpointUri,
    Object body,
    String property,
    Object propertyValue
);
void sendBodyAndProperty(
    Endpoint endpoint,
    Object body,
    String property,
    Object propertyValue
);
Object sendBodyAndProperty(
    String endpoint,
    ExchangePattern pattern,
    Object body,
    String property,
    Object propertyValue
);
Object sendBodyAndProperty(
    Endpoint endpoint,
    ExchangePattern pattern,
    Object body,
    String property,
    Object propertyValue
);

37.1.3. 使用 InOut Pattern 同步请求

概述

同步请求 方法与同步发送方法类似,但请求方法强制消息交换模式为 InOut (类似于请求/回复语义)。因此,如果您打算从制作者端点收到回复,使用同步请求方法通常方便。

请求由处理器填充的交换

基本 request () 方法是一种通用方法,它使用处理器填充默认交换,并强制消息交换模式为 InOut (因此调用模糊请求/回复语义)。返回值是您通过制作者端点处理后获得的交换,其中 Out 消息包含回复消息。

发送由处理器填充的交换的 request () 方法允许您以以下一种方式指定目标端点: 作为端点 URI 或 Endpoint 对象。

Exchange request(String endpointUri, Processor processor);
Exchange request(Endpoint endpoint, Processor processor);

请求消息正文

如果您只关注请求和回复中消息正文的内容,您可以使用 requestBody () 方法将请求消息正文作为参数提供,并使制作者模板负责将正文插入默认交换对象。

requestBody () 方法允许您用以下任一方式指定目标端点: 作为默认端点,作为端点 URI 或 Endpoint 对象。返回值是回复消息的正文(Out message body),它可以返回为普通 对象,也可以使用内置类型 T 将其转换为特定类型的 T (请参阅 第 34.3 节 “built-In Type Converters”)。

Object requestBody(Object body);
<T> T requestBody(Object body, Class<T> type);
Object requestBody(
    String endpointUri,
    Object body
);
<T> T requestBody(
    String endpointUri,
    Object body,
    Class<T> type
);
Object requestBody(
    Endpoint endpoint,
    Object body
);
<T> T requestBody(
    Endpoint endpoint,
    Object body,
    Class<T> type
);

请求邮件正文和标头

您可以使用 requestBodyAndHeader () 方法尝试设置单个标头值的影响。您可以将消息 body 和 header 设置作为参数提供给 requestBodyAndHeader (),并让制作者模板负责将 body 和 exchange 属性插入到默认的交换对象中。

requestBodyAndHeader () 方法允许您用以下一种方式指定目标端点: 作为端点 URI 或 Endpoint 对象。返回值是回复消息的正文(Out message body),它可以返回为普通 对象,也可以使用内置类型 T 将其转换为特定类型的 T (请参阅 第 34.3 节 “built-In Type Converters”)。

Object requestBodyAndHeader(
    String endpointUri,
    Object body,
    String header,
    Object headerValue
);
<T> T requestBodyAndHeader(
    String endpointUri,
    Object body,
    String header,
    Object headerValue,
    Class<T> type
);
Object requestBodyAndHeader(
    Endpoint endpoint,
    Object body,
    String header,
    Object headerValue
);
<T> T requestBodyAndHeader(
    Endpoint endpoint,
    Object body,
    String header,
    Object headerValue,
    Class<T> type
);

requestBodyAndHeaders () 方法与 requestBodyAndHeader () 方法类似,除了只提供单个标头设置外,这些方法允许您指定完整的标头设置映射。

Object requestBodyAndHeaders(
    String endpointUri,
    Object body,
    Map<String, Object> headers
);
<T> T requestBodyAndHeaders(
    String endpointUri,
    Object body,
    Map<String, Object> headers,
    Class<T> type
);
Object requestBodyAndHeaders(
    Endpoint endpoint,
    Object body,
    Map<String, Object> headers
);
<T> T requestBodyAndHeaders(
    Endpoint endpoint,
    Object body,
    Map<String, Object> headers,
    Class<T> type
);

37.1.4. 异步发送

概述

producer 模板提供了各种异步调用制作者端点的方法,因此主线程在等待调用完成时不会阻断,并且稍后可以检索回复消息。本节中描述的异步发送方法与任何类型的消息交换协议兼容。

发送交换

基本的 asyncSend () 方法采用 Exchange 参数,并使用指定交换的消息交换模式(MEP)异步调用端点。返回值是一个 java.util.concurrent.Future 对象,它是一个 ticket,您可以在以后用于收集回复信息。有关如何从 future 对象获取返回值的详情,请参阅 “异步调用”一节

以下 asyncSend () 方法允许您以以下一种方式指定目标端点: 作为端点 URI 或 Endpoint 对象。

Future<Exchange> asyncSend(String endpointUri, Exchange exchange);
Future<Exchange> asyncSend(Endpoint endpoint, Exchange exchange);

发送由处理器填充的交换

常规 asyncSend () 方法的一个简单变体是使用处理器填充默认交换,而不是显式提供交换对象。

以下 asyncSend () 方法允许您以以下一种方式指定目标端点: 作为端点 URI 或 Endpoint 对象。

Future<Exchange> asyncSend(String endpointUri, Processor processor);
Future<Exchange> asyncSend(Endpoint endpoint, Processor processor);

发送邮件正文

如果您只关注要发送的消息正文的内容,您可以使用 asyncSendBody () 方法异步发送消息正文,并让制作者模板负责将正文插入默认交换对象。

asyncSendBody () 方法允许您以以下一种方式指定目标端点: 作为端点 URI 或 Endpoint 对象。

Future<Object> asyncSendBody(String endpointUri, Object body);
Future<Object> asyncSendBody(Endpoint endpoint, Object body);

37.1.5. 带有 InOut 模式的异步请求

概述

异步请求 方法与异步发送方法类似,但请求方法强制消息交换模式为 InOut (类似于请求/回复语义)。因此,如果您打算从制作者端点收到回复,使用异步请求方法通常比较方便。

请求消息正文

如果您只关注请求和回复中消息正文的内容,您可以使用 requestBody () 方法将请求消息正文作为参数提供,并使制作者模板负责将正文插入默认交换对象。

asyncRequestBody () 方法允许您以以下一种方式指定目标端点: 作为端点 URI 或 Endpoint 对象。从 future 对象检索的返回值是回复消息的正文(明确消息正文),它可以作为普通 对象 返回,或使用内置类型 T 转换(请参阅 “异步调用”一节)。

Future<Object> asyncRequestBody(
    String endpointUri,
    Object body
);
<T> Future<T> asyncRequestBody(
    String endpointUri,
    Object body,
    Class<T> type
);
Future<Object> asyncRequestBody(
    Endpoint endpoint,
    Object body
);
<T> Future<T> asyncRequestBody(
    Endpoint endpoint,
    Object body,
    Class<T> type
);

请求邮件正文和标头

您可以使用 asyncRequestBodyAndHeader () 方法尝试设置单个标头值的影响。您可以将消息 body 和 header 设置作为参数提供给 asyncRequestBodyAndHeader (),并让制作者模板负责将 body 和 exchange 属性插入到默认的交换对象中。

asyncRequestBodyAndHeader () 方法允许您用以下一种方式指定目标端点: 作为端点 URI 或 Endpoint 对象。从 future 对象检索的返回值是回复消息的正文(明确消息正文),它可以作为普通 对象 返回,或使用内置类型 T 转换(请参阅 “异步调用”一节)。

Future<Object> asyncRequestBodyAndHeader(
    String endpointUri,
    Object body,
    String header,
    Object headerValue
);
<T> Future<T> asyncRequestBodyAndHeader(
    String endpointUri,
    Object body,
    String header,
    Object headerValue,
    Class<T> type
);
Future<Object> asyncRequestBodyAndHeader(
    Endpoint endpoint,
    Object body,
    String header,
    Object headerValue
);
<T> Future<T> asyncRequestBodyAndHeader(
    Endpoint endpoint,
    Object body,
    String header,
    Object headerValue,
    Class<T> type
);

asyncRequestBodyAndHeaders () 方法与 asyncRequestBodyAndHeader () 方法类似,除了仅提供单个标头设置外,这些方法允许您指定标头设置的完整散列映射。

Future<Object> asyncRequestBodyAndHeaders(
    String endpointUri,
    Object body,
    Map<String, Object> headers
);
<T> Future<T> asyncRequestBodyAndHeaders(
    String endpointUri,
    Object body,
    Map<String, Object> headers,
    Class<T> type
);
Future<Object> asyncRequestBodyAndHeaders(
    Endpoint endpoint,
    Object body,
    Map<String, Object> headers
);
<T> Future<T> asyncRequestBodyAndHeaders(
    Endpoint endpoint,
    Object body,
    Map<String, Object> headers,
    Class<T> type
);

37.1.6. 使用回调进行异步发送

概述

producer 模板还提供了在用于调用制作者端点的同一子线程中处理回复消息的选项。在这种情况下,您提供了一个回调对象,它会在收到回复消息后立即在子线程中自动调用。换句话说,使用回调方法进行异步发送 可让您在主线程中启动调用,然后让所有关联的处理 producer 端点的相关处理,等待一个回复,并在子线程中异步处理回复( response-thread)。

发送交换

基本的 asyncCallback () 方法采用 Exchange 参数,并使用指定交换的消息交换模式(MEP)异步调用端点。这个方法与用于交换的 asyncSend () 方法类似,但它使用了额外的 org.apache.camel.spi.Synchronization 参数,它是一个带有两种方法的回调接口: onComplete ()onFailure ()。有关如何使用 同步 回调的详情,请参考 “使用回调进行异步调用”一节

以下 asyncCallback () 方法允许您用以下一种方式指定目标端点: 作为端点 URI 或 Endpoint 对象。

Future<Exchange> asyncCallback(
    String endpointUri,
    Exchange exchange,
    Synchronization onCompletion
);
Future<Exchange> asyncCallback(
    Endpoint endpoint,
    Exchange exchange,
    Synchronization onCompletion
);

发送由处理器填充的交换

处理器的 asyncCallback () 方法调用处理器来填充默认交换,并强制消息交换模式为 InOut (因此调用模糊请求/回复语义)。

以下 asyncCallback () 方法允许您用以下一种方式指定目标端点: 作为端点 URI 或 Endpoint 对象。

Future<Exchange> asyncCallback(
    String endpointUri,
    Processor processor,
    Synchronization onCompletion
);
Future<Exchange> asyncCallback(
    Endpoint endpoint,
    Processor processor,
    Synchronization onCompletion
);

发送邮件正文

如果您只关注要发送的消息正文的内容,您可以使用 asyncCallbackSendBody () 方法异步发送消息正文,并让制作者模板负责将正文插入默认交换对象。

asyncCallbackSendBody () 方法允许您用以下一种方式指定目标端点: 作为端点 URI 或 Endpoint 对象。

Future<Object> asyncCallbackSendBody(
    String endpointUri,
    Object body,
    Synchronization onCompletion
);
Future<Object> asyncCallbackSendBody(
    Endpoint endpoint,
    Object body,
    Synchronization onCompletion
);

请求消息正文

如果您只关注请求和回复中消息正文的内容,您可以使用 asyncCallbackRequestBody () 方法将请求消息正文作为参数提供,并让制作者模板负责将正文插入到默认的交换对象中。

asyncCallbackRequestBody () 方法允许您用以下一种方式指定目标端点: 作为端点 URI 或 Endpoint 对象。

Future<Object> asyncCallbackRequestBody(
    String endpointUri,
    Object body,
    Synchronization onCompletion
);
Future<Object> asyncCallbackRequestBody(
    Endpoint endpoint,
    Object body,
    Synchronization onCompletion
);
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.