第 38 章 实现组件
摘要
本章介绍了可用于实施 Apache Camel 组件的方法的一般概述。
38.1. 组件架构
38.1.1. 组件的工厂模式
概述
Apache Camel 组件由一组通过工厂模式相互相关的类组成。组件的主要入口点是组件对象本身( org.apache.camel.
类型的实例)。您可以使用 Component
组件
对象作为工厂创建 Endpoint
对象,后者作为创建 Consumer
、Producer
和 Exchange
对象的工厂。这些关系总结在 图 38.1 “组件工厂模式”
图 38.1. 组件工厂模式
组件
组件实现是端点工厂。组件实施器的主要任务是实现 Component.createEndpoint ()
方法,它负责按需创建新端点。
每种组件都必须与端点 URI 中显示的 组件前缀 关联。例如,文件组件通常与 文件 前缀关联,该前缀可在端点 URI 中使用,如 file://tmp/messages/input。在 Apache Camel 中安装新组件时,您必须定义特定组件前缀和实现组件的类名称之间的关联。
端点
每个端点实例封装特定的端点 URI。每次 Apache Camel 遇到新端点 URI 时,它会创建一个新的端点实例。端点对象也是创建消费者端点和制作者端点的工厂。
端点必须实施 org.apache.camel.Endpoint 接口。Endpoint 接口定义以下工厂方法:
-
createConsumer ()
和createPollingConsumer ()
ProductShortName-wagonCreates 是一个消费者端点,它代表路由开头的源端点。 -
createProducer ()
criu- iwlCreates a producer 端点,它代表路由末尾的目标端点。 -
createExchange ()
mvapich- iwlCreates 一个交换对象,它封装了路由传递和关闭的消息。
消费者
消费者端点 消耗 请求。它们始终显示在路由开始时,并封装负责接收传入请求并发送传出回复的代码。从面向服务的角度来说,消费者代表 服务。
消费者必须实施 org.apache.camel.Consumer 接口。在实施消费者时,您可以遵循多种不同的模式。这些模式在 第 38.1.3 节 “消费者模式和线程” 中进行了描述。
制作者
制作者端点 生成 请求。它们始终显示在路由的末尾,并封装负责分配传出请求并接收传入的回复的代码。从面向服务的角度来说,生成者代表 服务消费者。
生产者必须实施 org.apache.camel.Producer
接口。您可以选择实施制作者来支持异步处理。详情请查看 第 38.1.4 节 “异步处理”。
Exchange
Exchange 对象封装一组相关的消息。例如,一种消息交换是同步调用,它由请求消息及其相关回复组成。
交换必须实施 org.apache.camel.Exchange 接口。默认的实现( DefaultExchange
)足以满足许多组件实现的。但是,如果您想将额外的数据与交换相关联,或者具有交换预先处理,则自定义交换实施非常有用。
消息
- 在 message iwl-wagonholds current message 中。
- 出 message iwl-wagontemporarily 包含回复消息。
所有消息类型都由相同的 Java 对象 org.apache.camel.Message
表示。通常并非始终需要自定义消息实现的 implementation implementation implementation DefaultMessage
。
38.1.2. 在路由中使用组件
概述
Apache Camel 路由基本上是 org.apache.camel.Processor 类型的处理器管道。消息封装在交换对象 E
中,后者通过调用 process ()
方法从节点传递到节点。处理器管道的架构在 图 38.2 “路由中的消费者和 Producer 实例” 中显示。
图 38.2. 路由中的消费者和 Producer 实例
源端点
在路由开始时,您有源端点,该端点由 org.apache.camel.Consumer
对象表示。源端点负责接受传入的请求消息并发送回复。在构建路由时,Apache Camel 根据端点 URI 中的组件前缀创建适当的 Consumer
类型,如 第 38.1.1 节 “组件的工厂模式” 所述。
处理器
管道中的每个中间节点都由一个处理器对象表示(实施 org.apache.camel.Processor 接口)。您可以插入标准处理器(例如,过滤
、throttler
或 delayer
),或插入您自己的自定义处理器实现。
目标端点
在路由结束时,目标端点由 org.apache.camel.Producer
对象表示。由于它位于处理器管道的末尾,因此生产者也是处理器对象(实施 org.apache.camel.Processor 接口)。目标端点负责发送传出请求消息并接收传入的回复。在构建路由时,Apache Camel 根据端点 URI 中的组件前缀创建适当的 Producer
类型。
38.1.3. 消费者模式和线程
概述
用于实现消费者的模式决定了处理传入交换时使用的线程模型。可使用以下模式之一实施消费者:
- 事件驱动的模式 mvapich - iwl - 使用者由外部线程驱动。
- 调度的轮询模式 mvapich- iwlThe 使用者由专用线程池驱动。
- 轮询模式 mvapich-wagon 线程模型未定义。
事件驱动的模式
在事件驱动的模式中,当应用的另一个部分(通常是第三方库)调用由使用者实施的方法时,会启动传入请求的处理。事件驱动的消费者的一个很好的例子是 Apache Camel JMX 组件,其中由 JMX 库启动事件。JMX 库调用 handleNotification ()
方法,来发起请求处理 TOTP-mvapichsee 例 41.4 “JMXConsumer 实现” 以了解详细信息。
图 38.3 “event-Driven Consumer” 显示事件驱动的消费者模式的概述。在本例中,假设处理是由调用 notify()
方法触发的。
图 38.3. event-Driven Consumer
事件驱动的消费者处理传入的请求,如下所示:
消费者必须实施一种方法来接收传入事件(在 图 38.3 “event-Driven Consumer” 中,这由
notify()
方法表示)。调用通知()
的线程通常是应用的独立部分,因此消费者的线程策略是外部的。例如,如果实现 JMX 消费者,消费者实施
NotificationListener.handleNotification ()
方法,以接收来自 JMX 的通知。驱动消费者处理的线程在 JMX 层内创建。-
在
notify()方法的
正文中,消费者首先将传入事件转换为交换对象E
,然后在路由中的下一个处理器上调用process ()
,将 Exchange 对象作为其参数。
调度的轮询模式
在调度的轮询模式中,消费者定期检查请求是否到达,从而检索传入的请求。检查请求由内置计时器类( 调度的 executor 服务 )自动调度,后者是由 java.util.concurrent 库提供的标准模式。调度的 executor 服务以时间间隔执行特定的任务,它还管理一个线程池,用于运行任务实例。
图 38.4 “调度的 Poll Consumer” 显示计划的轮询消费者模式的概述。
图 38.4. 调度的 Poll Consumer
调度的轮询消费者处理传入的请求,如下所示:
-
调度的 executor 服务有一个线程池,可用于启动消费者处理。每次调度的时间间隔后,调度的 executor 服务会尝试从其池中获取空闲线程(默认为池中的五个线程)。如果有空闲线程可用,它会使用该线程调用消费者的
poll ()
方法。 -
消费者的
poll ()
方法旨在触发传入请求的处理。在poll ()
方法的正文中,消费者会尝试检索传入的消息。如果没有可用的请求,poll ()
方法会立即返回。 -
如果请求消息可用,则消费者将其插入到交换对象中,然后在路由中的下一个处理器上调用
process ()
,将 exchange 对象作为其参数。
轮询模式
在轮询模式中,当第三方调用其中一个消费者轮询方法时,会启动传入请求的处理:
-
receive()
-
receiveNoWait()
-
receive (long timeout)
取决于组件实施,用于定义在轮询方法上启动调用的精确机制。这种机制没有在轮询模式下指定。
图 38.5 “polling Consumer” 显示轮询消费者模式的概述。
图 38.5. polling Consumer
轮询消费者处理传入的请求,如下所示:
- 每当调用其中一个消费者的轮询方法时,会启动传入请求的处理。调用这些轮询方法的机制由组件实施定义。
在
receive ()
方法的正文中,消费者尝试检索传入的请求消息。如果当前没有可用的消息,则行为取决于调用哪个接收方法。-
receiveNoWait ()
立即返回 -
receive (长超时)
等待指定的超时时间[2] 返回前 -
receive ()
等待消息被接收
-
-
如果请求消息可用,则消费者将其插入到交换对象中,然后在路由中的下一个处理器上调用
process ()
,将 exchange 对象作为其参数。
38.1.4. 异步处理
概述
生产者端点通常在处理交换时遵循 同步 模式。当管道前面的处理器在生成者上调用 process ()
时,process ()
方法会阻止,直到收到回复为止。在这种情况下,处理器的线程会卡住,直到生成者完成发送请求并接收回复的周期。
但是,您可能更喜欢将前面的处理器与生成者分离,因此处理器的线程会立即释放,且 process ()
调用 不会 阻断。在这种情况下,您应该使用 异步 模式实施制作者,这为前面的处理器提供调用 process ()
方法的非阻塞版本的选项。
为了让您了解不同的实施选项,本节描述了实施制作者端点的同步和异步模式。
同步制作者
图 38.6 “同步 Producer” 显示同步制作者的概述,其中前面的处理器块直到生成者完成处理交换为止。
图 38.6. 同步 Producer
同步制作者按如下方式处理交换:
-
管道中的前面的处理器调用制作者上的同步
process ()
方法,以启动同步处理。同步process ()
方法采用单个交换参数。 -
在
process ()
方法的正文中,生成者将请求(In message)发送到端点。 -
如果交换模式需要,生成者会等待回复(传出 消息)到达端点。此步骤可能会导致
process ()
方法无限期阻止。但是,如果交换模式不强制回复,则process ()
方法可以在发送请求后立即返回。 -
当
process ()
方法返回时,exchange 对象包含来自同步调用的回复(明确消息消息)。
异步制作者
图 38.7 “异步 Producer” 显示异步制作者的概述,其中生成者在子线程中处理交换,并且前面的处理器在任何显著长度没有被阻止。
图 38.7. 异步 Producer
异步制作者按如下方式处理交换:
-
在处理器可以调用异步
process ()
方法之前,它必须创建一个 异步回调 对象,该对象负责处理路由的返回部分的交换。对于异步回调,处理器必须实施从 AsyncCallback 接口继承的类。 处理器调用制作者上的异步
process ()
方法,以启动异步处理。异步process ()
方法采用两个参数:- 一个交换对象
- 同步回调对象
-
在
process ()
方法的正文中,生成者会创建一个可运行的
对象来封装处理代码。然后,生成者会将此 Runnable对象的
执行委托给子线程。 -
异步
process ()
方法返回,从而释放处理器的线程。交换处理在单独的子线程中继续。 -
Runnable
对象将 In 消息发送到端点。 -
如果交换模式需要,Runnable 对象会等待回复(Out 或 Fault 消息)到达端点。
在收到回复前,Runnable 对象会一直被阻止。
-
在回复到达后,
Run
nable 对象将回复(Out 消息)插入到交换对象中,然后在异步回调对象上调用done ()
。然后,异步回调负责处理回复消息(在子线程中执行)。