第 38 章 实施组件
摘要
本章介绍了可用于实施 Apache Camel 组件的方法的一般概述。
38.1. 组件架构
38.1.1. 组件工厂模式
概述
Apache Camel 组件由一组通过工厂模式相互相关的类组成。主入口点指向组件对象本身( org.apache.camel.
)。您可以使用 Component
类型的实例组件
对象作为工厂创建 Endpoint
对象,后者充当创建 消费者
、Producer
和 Exchange
对象的因素。这些关系总结在 图 38.1 “组件工厂模式”
图 38.1. 组件工厂模式
组件
组件实现是一个端点工厂。组件实施者的主要任务是实施 Component.createEndpoint ()
方法,它负责根据需要创建新端点。
每个组件都必须与端点 URI 中显示的 组件前缀 关联。例如,文件组件通常与 文件 前缀关联,该前缀可在 file://tmp/messages/input 等端点 URI 中使用。在 Apache Camel 中安装新组件时,您必须定义特定组件前缀和实施组件的类名称之间的关联。
端点
每个端点实例封装特定的端点 URI。每次 Apache Camel 遇到新端点 URI 时,它会创建一个新的端点实例。端点对象也是创建消费者端点和制作者端点的工厂。
端点必须实施 org.apache.camel.Endpoint 接口。Endpoint 接口定义以下工厂方法:
-
createConsumer ()
和createPollingConsumer ()
HEKETI-wagonCreates 是一个消费者端点,它代表路由开头的源端点。 -
createProducer ()
5-4Creates a producer 端点,它代表路由末尾的目标端点。 -
createExchange ()
HEKETI-wagonCreates 是一个交换对象,它封装了传递和关闭路由的消息。
消费者
消费者端点 使用 请求。它们始终出现在路由的开头,它们封装负责接收传入请求和分配传出回复的代码。从面向服务的角度来说,使用者代表 服务。
消费者必须实施 org.apache.camel.Consumer 接口。在实施消费者时,您可以遵循许多不同的模式。这些模式在 第 38.1.3 节 “消费者模式和线程” 中进行了描述。
制作者
生产者端点 生成 请求。它们始终出现在路由的末尾,它们封装负责分配传出请求并接收传入回复的代码。从面向服务的视角中,生成者代表 服务消费者。
生产者必须实施 org.apache.camel.Producer
接口。您可以选择实施制作者,以支持异步处理风格。详情请查看 第 38.1.4 节 “异步处理”。
Exchange
Exchange 对象封装一组相关的消息。例如,一种消息交换是同步调用,由请求消息及其相关回复组成。
交换必须实施 org.apache.camel.Exchange 接口。默认实施 DefaultExchange
足以满足许多组件实现的情况。但是,如果您想要将额外的数据与交换相关联,或者有交换前进行额外的处理,那么自定义交换实施会很有用。
消息
- 在消息 swig-Cloneholds the current 信息中。
- out message swig-wagontemporarily 包含回复信息。
所有消息类型都由同一 Java 对象 org.apache.camel.Message
表示。并不总是需要自定义消息实现 swig-wagonthe default 实现 DefaultMessage
,通常就足够了。
38.1.2. 在路由中使用组件
概述
Apache Camel 路由本质上是 org.apache.camel.Processor 类型的处理器管道。消息封装在交换对象 E
中,通过调用 process ()
方法从节点传递到节点。处理器管道的架构在 图 38.2 “路由中的消费者和生成实例” 中进行了说明。
图 38.2. 路由中的消费者和生成实例
源端点
在路由开始时,您有源端点,该端点由 org.apache.camel.Consumer
对象表示。源端点负责接受传入请求消息并分配回复。在构建路由时,Apache Camel 根据端点 URI 的组件前缀创建适当的 Consumer
类型,如 第 38.1.1 节 “组件工厂模式” 所述。
Processors
管道中的每个中间节点都由处理器对象(实现 org.apache.camel.Processor 接口)表示。您可以插入标准处理器(例如,过滤
、throttler
或 delayer
),或者插入您自己的自定义处理器实现。
目标端点
在路由的末尾,目标端点由 org.apache.camel.Producer
对象表示。由于它位于处理器管道的末尾,因此生成者也是处理器对象(实施 org.apache.camel.Processor 接口)。目标端点负责发送传出请求消息并接收传入的回复。在构建路由时,Apache Camel 根据端点 URI 的组件前缀创建适当的 Producer
类型。
38.1.3. 消费者模式和线程
概述
用于实现使用者的模式决定了处理传入交换时使用的线程模型。消费者可使用以下模式之一实施:
- 事件驱动的模式 HEKETI -wagon The consumer 由外部线程驱动。
- 调度的轮询模式 HEKETI-Clone The consumer 由专用线程池驱动。
- 轮询模式 HEKETI-Clone 线程模型未定义。
事件驱动的模式
在事件驱动的模式中,当应用的另一部分(通常是第三方库)调用由消费者实施的方法时,会启动传入请求的处理。事件驱动的消费者的一个很好的例子是 Apache Camel JMX 组件,其中事件由 JMX 库启动。JMX 库调用 handleNotification ()
方法来发起请求处理 processing swig-wagon 请参见 例 41.4 “JMXConsumer 实现” 以了解详细信息。
图 38.3 “event-Driven Consumer” 显示事件驱动的消费者模式的概要。在本例中,假设处理是由对 notify()
方法的调用触发的。
图 38.3. event-Driven Consumer
事件驱动的消费者处理传入的请求,如下所示:
消费者必须实施接收传入事件的方法(在 图 38.3 “event-Driven Consumer” 中,由
notify()
方法表示)。调用notify()
的线程通常是应用的独立部分,因此消费者的线程策略是外部驱动的。例如,如果是 JMX 消费者实施,使用者实施
NotificationListener.handleNotification ()
方法,以便从 JMX 接收通知。驱动消费者处理的线程是在 JMX 层中创建的。-
在
notify()
方法的正文中,消费者首先将传入的事件转换为交换对象E
,然后在路由中的下一个处理器上调用process ()
,并将交换对象作为参数传递。
调度的轮询模式
在调度的轮询模式中,使用者定期检查请求是否到达,从而检索传入的请求。检查请求由内置的计时器类(计划的 executor 服务)自动调度,该服务 是由 java.util.concurrent 库提供的标准模式。调度的 executor 服务以时段执行特定的任务,并且管理一个用于运行任务实例的线程池。
图 38.4 “Scheduled Poll Consumer” 显示调度的轮询消费者模式的概要。
图 38.4. Scheduled Poll Consumer
调度的轮询消费者处理传入的请求,如下所示:
-
调度的 executor 服务有一个线程池,可用于启动消费者处理。在各个调度的时间间隔被过后,调度的 executor 服务会尝试从池中获取可用线程(默认为池中的五个线程)。如果有可用的线程可用,它会使用该线程调用消费者的
poll ()
方法。 -
消费者的
poll ()
方法旨在触发传入请求的处理。在poll ()
方法的正文中,使用者会尝试检索传入的消息。如果没有可用的请求,poll ()
方法会立即返回。 -
如果请求消息可用,则消费者将其插入到交换对象中,然后在路由中的下一个处理器上调用
process ()
,并将交换对象作为其参数。
轮询模式
在轮询模式中,当第三方调用消费者的轮询方法之一时,启动传入请求的处理:
-
receive()
-
receiveNoWait()
-
receive (长超时)
组件实施最多用于定义在轮询方法上启动调用的确切机制。这种机制不在轮询模式中指定。
图 38.5 “polling Consumer” 显示轮询消费者模式的概述。
图 38.5. polling Consumer
轮询消费者处理传入的请求,如下所示:
- 每当调用使用者轮询方法之一时,都会启动传入请求的处理。调用这些轮询方法的机制由组件实施定义。
在
receive ()
方法的正文中,使用者会尝试检索传入的请求消息。如果当前没有消息,则行为取决于调用哪个接收方法。-
receiveNoWait ()
立即返回 -
receive (长超时)
等待指定的超时间隔[2] 返回前 -
receive ()
等待收到消息
-
-
如果请求消息可用,则消费者将其插入到交换对象中,然后在路由中的下一个处理器上调用
process ()
,并将交换对象作为其参数。
38.1.4. 异步处理
概述
在处理交换时,生成者端点通常遵循 同步 模式。当生成者上的管道调用 process ()
中的上述处理器时,process ()
方法会阻止,直至收到回复。在这种情况下,处理器的线程会被阻断,直到生成者完成发送请求和接收回复的周期。
但是,有时您可能希望将前面的处理器与生成者分离,因此处理器的线程会立即释放,并且 process ()
调用 不会 阻断。在这种情况下,您应该使用 异步 模式实施制作者,这为前面的处理器提供调用 process ()
方法的非阻塞版本的选项。
为了让您了解不同的实施选项,本节描述了用于实施制作者端点的同步和异步模式。
同步制作者
图 38.6 “同步 Producer” 显示同步制作者的概述,其中前面的处理器块直到生产者完成交换为止。
图 38.6. 同步 Producer
同步制作者按如下方式处理交换:
-
管道中的前面的处理器调用制作者上的
同步进程
() 方法来启动同步处理。同步process ()
方法采用单个交换参数。 -
在
process ()
方法的正文中,生成者将请求(消息)发送到端点。 -
如果交换模式需要,生成者会等待回复(Out 消息)到达端点。此步骤可能会导致
process ()
方法无限期阻止。但是,如果交换模式不强制回复,则process ()
方法可以在发送请求后立即返回。 -
当
process ()
方法返回时,交换对象包含来自同步调用的回复( Out 消息消息)。
异步制作者
图 38.7 “异步生成” 显示了异步制作者的概要,其中生成者在子线程中处理交换,并且前面的处理器不会因任何显著时长而被阻止。
图 38.7. 异步生成
异步制作者按如下方式处理交换:
-
在处理器可以调用异步
process ()
方法之前,它必须创建一个 异步回调 对象,该对象负责处理路由的返回部分的交换。对于异步回调,处理器必须实施从 AsyncCallback 接口继承的类。 处理器调用制作者上的异步
process ()
方法,以启动异步处理。异步process ()
方法采用两个参数:- 交换对象
- 同步回调对象
-
在
process ()
方法的正文中,生成者会创建一个可封装处理代码的可运行
对象。然后,生产者将此可运行
对象的执行委派给子线程。 -
异步
process ()
方法返回,从而释放处理器的线程。交换处理将继续在不同的子线程中。 -
Runnable
对象将 In 消息发送到端点。 -
如果交换模式需要,Runnable 对象会等待回复(Out 或 Fault 消息)到达端点。
Runnable
对象
会被阻断,直到收到回复为止。 -
回复到达后,Run
nable
对象会将回复(Out 消息)插入到交换对象中,然后在异步回调对象上调用done ()
。然后,异步回调负责处理回复消息(在子线程中执行)。