第 37 章 生产者和消费者模板
摘要
Apache Camel 中的生成者和消费者模板在 Spring 容器 API 的功能后建模,通过一个简单易用的 API (称为 模板 )来提供资源的访问。如果是 Apache Camel,生产者模板和消费者模板提供了简化的接口,用于向和接收来自生成者端点和消费者端点的消息。
37.1. 使用 Producer 模板 复制链接链接已复制到粘贴板!
37.1.1. Producer 模板简介 复制链接链接已复制到粘贴板!
概述 复制链接链接已复制到粘贴板!
producer 模板支持各种不同的方法来调用制作者端点。有方法支持请求消息的不同格式(作为 Exchange
对象,作为消息正文,作为带单个标头设置的消息正文,等等),有支持同步和异步调用方式的方法。总体而言,生成者模板方法可分为以下类别:
或者,请参阅 第 37.2 节 “使用 Fluent Producer 模板”。
同步调用 复制链接链接已复制到粘贴板!
同步调用端点的方法具有 发送后缀()和
的形式的名称。例如,使用默认消息交换模式(MEP)或明确指定的 MEP 调用端点的方法被命名为 请求
后缀()send ()
, sendBody ()
, 和 sendBodyAndHeader ()
(这些方法分别发送 Exchange
对象、消息正文或消息正文和标头值)。如果要强制 MEP 为 InOut (请求/回复语义),您可以改为调用 request ()
、requestBody ()
和 requestBodyAndHeader ()
方法。
以下示例演示了如何创建 ProducerTemplate
实例,并使用它来发送消息正文到 activemq:MyQueue
端点。示例还演示了如何使用 sendBodyAndHeader ()
发送消息正文和标头值。
使用处理器进行同步调用 复制链接链接已复制到粘贴板!
同步调用的一个特殊情况是您为 send ()
方法提供 Processor
参数而不是 Exchange
参数。在这种情况下,生成者模板会隐式要求指定的端点创建 Exchange
实例(通常并非始终默认具有 InOnly MEP)。然后,此默认交换传递到处理器,它将初始化交换对象的内容。
以下示例演示了如何将 MyProcessor
处理器初始化的交换发送到 activemq:MyQueue
端点。
MyProcessor
类按照以下示例所示实施。除了设置 In message body (如此处所示),您还可以初始化消息标题和交换属性。
异步调用 复制链接链接已复制到粘贴板!
异步 调用端点的方法具有 asyncSendSuffix()和
形式的名称。例如,使用默认消息交换模式(MEP)或明确指定的 MEP 调用端点的方法被命名为 asyncRequestSuffix()
asyncSend ()
和 asyncSendBody ()
(其中这些方法分别发送 Exchange
对象或消息正文)。如果要强制 MEP 为 InOut (request/reply 语义),您可以调用 asyncRequestBody ()
、syncRequestBodyAndHeader ()、syncRequestBodyAndHeader ()
和 asyncRequestBodyAndHeaders ()
方法。
以下示例演示了如何异步向 direct:start
端点发送交换。asyncSend ()
方法返回一个 java.util.concurrent.Future
对象,用于稍后检索调用结果。
producer 模板还提供异步发送消息正文的方法(例如,使用 asyncSendBody ()
或 asyncRequestBody ()
)。在这种情况下,您可以使用以下帮助程序方法之一从 Future
对象中提取返回的消息正文:
<T> T extractFutureBody(Future future, Class<T> type); <T> T extractFutureBody(Future future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException;
<T> T extractFutureBody(Future future, Class<T> type);
<T> T extractFutureBody(Future future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException;
extractFutureBody ()
方法块的第一个版本,直到调用完成并且回复消息可用为止。extractFutureBody ()
方法的第二个版本允许您指定超时。两种方法都具有类型参数,键入
,它将返回的消息正文转换为指定类型(使用内置类型转换器)。
以下示例演示了如何使用 asyncRequestBody ()
方法将消息正文发送到 direct:start
端点。然后,使用 blocking extractFutureBody ()
方法从 Future
对象检索回复消息正文。
使用回调进行异步调用 复制链接链接已复制到粘贴板!
在前面的异步示例中,请求消息在子线程中分配,而回复则由主线程检索和处理。producer 模板还为您提供了选项,但子线程中的处理回复使用 asyncCallback ()
, , asyncCallbackSendBody ()
方法之一,或 asyncCallbackRequestBody ()
方法。在这种情况下,您可以提供一个回调对象( org.apache.camel.impl.SynchronizationAdapter
类型),它会在回复消息到达时在子线程中自动调用。
同步
回调接口定义如下:
如果在收到正常回复时调用 onComplete ()
方法,并且在收到错误消息回复时调用 onFailure ()
方法。只有其中一种方法会调用回来,因此您必须覆盖这两个方法,以确保处理所有类型的回复。
以下示例演示了如何将交换发送到 direct:start
端点,其中 SynchronizationAdapter
回调对象在子线程中处理回复消息。
其中
类是同步接口的默认实现,您可以覆盖它来提供您自己对 Synchronization
AdapteronComplete ()
和 onFailure ()
回调方法的定义。
您仍然有从主线程访问回复的选项,因为 asyncCallback ()
方法也会返回 Future
object swig-wagon 例如:
// Retrieve the reply from the main thread, specifying a timeout Exchange reply = future.get(10, TimeUnit.SECONDS);
// Retrieve the reply from the main thread, specifying a timeout
Exchange reply = future.get(10, TimeUnit.SECONDS);
37.1.2. 同步发送 复制链接链接已复制到粘贴板!
概述 复制链接链接已复制到粘贴板!
同步发送 方法是一组可用于调用制作者端点的方法集合,其中当前线程块直到方法调用完成并且收到回复(若有)。这些方法与任何类型的消息交换协议兼容。
发送交换 复制链接链接已复制到粘贴板!
基本的 send ()
方法是一种通用方法,利用交换的消息交换模式(MEP)将 Exchange
对象的内容发送到端点。返回值是您在生成者端点处理后获取的交换(可能包含 Out 消息,具体取决于 MEP)。
发送交换有三种 send ()
方法,允许您以以下一种方式指定目标端点: 作为默认端点、端点、端点 URI 或 Endpoint
对象。
Exchange send(Exchange exchange); Exchange send(String endpointUri, Exchange exchange); Exchange send(Endpoint endpoint, Exchange exchange);
Exchange send(Exchange exchange);
Exchange send(String endpointUri, Exchange exchange);
Exchange send(Endpoint endpoint, Exchange exchange);
发送由处理器填充的交换 复制链接链接已复制到粘贴板!
常规 send ()
方法的一个简单变体是使用处理器填充默认交换,而不是显式提供交换对象(详情请参阅 “使用处理器进行同步调用”一节 )。
发送由处理器填充的交换的 send ()
方法可让您以以下一种方式指定目标端点: 作为默认端点、端点 URI 或 Endpoint
对象。此外,您还可以通过提供 pattern
参数来指定交换的 MEP,而不接受默认值。
发送消息正文 复制链接链接已复制到粘贴板!
如果您只关注您要发送的消息正文的内容,您可以使用 sendBody ()
方法提供消息正文作为参数,并让制作者模板将正文插入到默认交换对象中。
sendBody ()
方法允许您使用以下方法之一指定目标端点: 作为默认端点,作为端点 URI 或 Endpoint
对象。此外,您还可以通过提供 pattern
参数来指定交换的 MEP,而不接受默认值。没有 模式
参数的方法返回 void
(即使调用在某些情况下可能会给回复),并且具有 pattern
参数的方法会返回 Out 消息的正文(如果有一个)或 In 消息的正文(否则为准)。
发送消息正文和标题 复制链接链接已复制到粘贴板!
出于测试目的,尝试 单个 标头设置的影响和 sendBodyAndHeader ()
方法对这种标头测试很有用。您提供消息正文和标头设置作为 sendBodyAndHeader ()
的参数,并允许制作者模板将正文和标头设置插入到默认交换对象中。
sendBodyAndHeader ()
方法允许您以以下一种方式指定目标端点: 作为默认端点,作为端点 URI 或 Endpoint
对象。此外,您还可以通过提供 pattern
参数来指定交换的 MEP,而不接受默认值。没有 模式
参数的方法返回 void
(即使调用在某些情况下可能会给回复),并且具有 pattern
参数的方法会返回 Out 消息的正文(如果有一个)或 In 消息的正文(否则为准)。
sendBodyAndHeaders ()
方法与 sendBodyAndHeader ()
方法类似,除了提供单个标头设置外,这些方法允许您指定标头设置的完整散列映射。
发送消息正文和交换属性 复制链接链接已复制到粘贴板!
您可以使用 sendBodyAndProperty ()
方法尝试设置单个交换属性的影响。您提供消息正文和属性设置作为 sendBodyAndProperty ()
的参数,并让制作者模板负责将正文和交换属性插入到默认的交换对象中。
sendBodyAndProperty ()
方法允许您以以下一种方式指定目标端点: 作为默认端点,作为端点 URI 或 Endpoint
对象。此外,您还可以通过提供 pattern
参数来指定交换的 MEP,而不接受默认值。没有 模式
参数的方法返回 void
(即使调用在某些情况下可能会给回复),并且具有 pattern
参数的方法会返回 Out 消息的正文(如果有一个)或 In 消息的正文(否则为准)。
37.1.3. 使用 InOut Pattern 同步请求 复制链接链接已复制到粘贴板!
概述 复制链接链接已复制到粘贴板!
同步请求 方法与同步发送方法类似,但请求方法强制消息交换模式为 InOut (代表请求/回复语义)。因此,如果您预期从生成者端点收到回复,通常最好使用同步请求方法。
请求由处理器填充的交换 复制链接链接已复制到粘贴板!
基本 request ()
方法是一种通用方法,它使用处理器填充默认交换,并强制消息交换模式成为 InOut (因此调用遵循请求/回复语义)。返回值是您在生成者端点处理后获取的交换,其中 Out 消息包含回复消息。
发送由处理器填充的交换的 request ()
方法可让您以以下一种方式指定目标端点: 作为端点 URI 或 Endpoint
对象。
Exchange request(String endpointUri, Processor processor); Exchange request(Endpoint endpoint, Processor processor);
Exchange request(String endpointUri, Processor processor);
Exchange request(Endpoint endpoint, Processor processor);
请求消息正文 复制链接链接已复制到粘贴板!
如果您只关注请求中的消息正文内容,并在回复中,您可以使用 requestBody ()
方法提供请求消息正文作为参数,并让制作者模板将正文插入到默认的交换对象中。
requestBody ()
方法允许您使用以下方法之一指定目标端点: 作为默认端点,作为端点 URI 或 Endpoint
对象。返回值是回复消息的正文(Out message body),可以是 plain Object
,也可以使用内置类型转换器(请参阅 第 34.3 节 “built-In Type Converters”)转换为特定类型的 T
。
请求消息正文和标头 复制链接链接已复制到粘贴板!
您可以使用 requestBodyAndHeader ()
方法尝试设置单个标头值的影响。您提供消息正文和标头设置作为 requestBodyAndHeader ()
的参数,并允许制作者模板将正文和交换属性插入到默认交换对象中。
requestBodyAndHeader ()
方法允许您以以下一种方式指定目标端点: 作为端点 URI 或 Endpoint
对象。返回值是回复消息的正文(Out message body),可以是 plain Object
,也可以使用内置类型转换器(请参阅 第 34.3 节 “built-In Type Converters”)转换为特定类型的 T
。
requestBodyAndHeaders ()
方法与 requestBodyAndHeader ()
方法类似,除了提供单个标头设置外,这些方法允许您指定标头设置的完整散列映射。
37.1.4. 异步发送 复制链接链接已复制到粘贴板!
概述 复制链接链接已复制到粘贴板!
producer 模板提供各种异步调用制作者端点的方法,以便在等待调用完成时阻止主线程,并且稍后可以检索回复消息。本节中描述的异步发送方法与任何类型的消息交换协议兼容。
发送交换 复制链接链接已复制到粘贴板!
基本的 asyncSend ()
方法采用 Exchange
参数,并使用指定交换的消息交换模式(MEP)异步调用端点。返回值是一个 java.util.concurrent.Future
对象,它是稍后用来收集回复消息的票据,请参阅 “异步调用”一节。
以下 asyncSend ()
方法可让您使用以下方法之一指定目标端点: 作为端点 URI 或 Endpoint
对象。
Future<Exchange> asyncSend(String endpointUri, Exchange exchange); Future<Exchange> asyncSend(Endpoint endpoint, Exchange exchange);
Future<Exchange> asyncSend(String endpointUri, Exchange exchange);
Future<Exchange> asyncSend(Endpoint endpoint, Exchange exchange);
发送由处理器填充的交换 复制链接链接已复制到粘贴板!
常规 asyncSend ()
方法的一个简单变体是使用处理器填充默认交换,而不是显式提供交换对象。
以下 asyncSend ()
方法可让您使用以下方法之一指定目标端点: 作为端点 URI 或 Endpoint
对象。
Future<Exchange> asyncSend(String endpointUri, Processor processor); Future<Exchange> asyncSend(Endpoint endpoint, Processor processor);
Future<Exchange> asyncSend(String endpointUri, Processor processor);
Future<Exchange> asyncSend(Endpoint endpoint, Processor processor);
发送消息正文 复制链接链接已复制到粘贴板!
如果您只关注您要发送的消息正文的内容,您可以使用 asyncSendBody ()
方法异步发送消息正文,并允许制作者模板将正文插入到默认交换对象中。
asyncSendBody ()
方法允许您以以下一种方式指定目标端点: 作为端点 URI 或 Endpoint
对象。
Future<Object> asyncSendBody(String endpointUri, Object body); Future<Object> asyncSendBody(Endpoint endpoint, Object body);
Future<Object> asyncSendBody(String endpointUri, Object body);
Future<Object> asyncSendBody(Endpoint endpoint, Object body);
37.1.5. 使用 InOut Pattern 的异步请求 复制链接链接已复制到粘贴板!
概述 复制链接链接已复制到粘贴板!
异步请求 方法与异步发送方法类似,但请求方法强制消息交换模式为 InOut (代表请求/回复语义)。因此,如果您预期从生成者端点收到回复,通常最好使用异步请求方法。
请求消息正文 复制链接链接已复制到粘贴板!
如果您只关注请求中的消息正文内容,并在回复中,您可以使用 requestBody ()
方法提供请求消息正文作为参数,并让制作者模板将正文插入到默认的交换对象中。
asyncRequestBody ()
方法允许您以以下一种方式指定目标端点: 作为端点 URI 或 Endpoint
对象。从 Future
对象检索的返回值是回复消息的正文(Out message body),它可以作为普通 对象
返回,或使用内置的类型 T
转换程序(请参阅 “异步调用”一节)。
请求消息正文和标头 复制链接链接已复制到粘贴板!
您可以使用 asyncRequestBodyAndHeader ()
方法尝试设置单个标头值的影响。您提供消息正文和标头设置作为 asyncRequestBodyAndHeader ()
的参数,并让制作者模板负责将正文和交换属性插入到默认交换对象中。
asyncRequestBodyAndHeader ()
方法可让您以以下一种方式指定目标端点: 作为端点 URI 或 Endpoint
对象。从 Future
对象检索的返回值是回复消息的正文(Out message body),它可以作为普通 对象
返回,或使用内置的类型 T
转换程序(请参阅 “异步调用”一节)。
asyncRequestBodyAndHeaders ()
方法与 asyncRequestBodyAndHeader ()
方法类似,除了提供单个标头设置外,这些方法允许您指定标头设置的完整散列映射。
37.1.6. 使用回调异步发送 复制链接链接已复制到粘贴板!
概述 复制链接链接已复制到粘贴板!
producer 模板还提供 选项,用于处理用于调用制作者端点的同一子线程中的回复消息。在这种情况下,您可以提供一个回调对象,它会在收到回复消息后立即在子线程中调用。换句话说,使用回调方法的异步发送 可让您在主线程中启动调用,然后具有生成者端点的所有关联处理因此,等待回复并在子线程中异步处理回复。
发送交换 复制链接链接已复制到粘贴板!
基本 asyncCallback ()
方法采用 Exchange
参数,并使用指定交换的消息交换模式(MEP)异步调用端点。这个方法类似于交换的 asyncSend ()
方法,但它采用额外的 org.apache.camel.spi.Synchronization
参数,它是一个具有两种方法: onComplete ()
和 onFailure ()
的回调接口。有关如何使用 同步
回调的详情,请参考 “使用回调进行异步调用”一节。
以下 asyncCallback ()
方法可让您以以下一种方式指定目标端点: 作为端点 URI 或 Endpoint
对象。
发送由处理器填充的交换 复制链接链接已复制到粘贴板!
处理器的 asyncCallback ()
方法调用处理器来填充默认交换,并强制消息交换模式成为 InOut (因此调用模糊请求/回复语义)。
以下 asyncCallback ()
方法可让您以以下一种方式指定目标端点: 作为端点 URI 或 Endpoint
对象。
发送消息正文 复制链接链接已复制到粘贴板!
如果您只关注您要发送的消息正文的内容,您可以使用 asyncCallbackSendBody ()
方法异步发送消息正文,并允许制作者模板将正文插入到默认交换对象中。
asyncCallbackSendBody ()
方法允许您以以下一种方式指定目标端点: 作为端点 URI 或 Endpoint
对象。
请求消息正文 复制链接链接已复制到粘贴板!
如果您只关注请求中的消息正文内容,并在回复中,您可以使用 asyncCallbackRequestBody ()
方法提供请求消息正文作为参数,并使制作者模板负责将正文插入到默认的交换对象中。
asyncCallbackRequestBody ()
方法允许您以以下一种方式指定目标端点: 作为端点 URI 或 Endpoint
对象。