第 38 章 实施组件
摘要
本章一般概述可用于实施 Apache Camel 组件。
38.1. 组件架构
38.1.1. 组件的工厂模式
概述
Apache Camel 组件由一组通过工厂模式相互关联的类组成。组件的主要入口点是 Component
对象本身( org.apache.camel.Component
类型的实例)。您可以使用 component 对象作为工厂来创建 Endpoint
对象,后者又是创建 使用者
、Producer
和 Exchange
对象的工厂。这些关系在 中进行了概述 图 38.1 “组件 onnectionFactoryy Patterns”
图 38.1. 组件 onnectionFactoryy Patterns
组件
组件实施是一个端点工厂。组件实施器的主要任务是实施 component .createEndpoint()
方法,它负责根据需要创建新端点。
每种组件必须与出现在端点 URI 中的组件前缀 关联。例如,文件组件通常与 文件 前缀关联,该前缀可以在 file://tmp/messages/input 等端点 URI 中使用。当您在 Apache Camel 中安装新组件时,您必须定义特定组件前缀和实现组件的类名称之间的关联。
端点
每个端点实例封装了特定的端点 URI。每次 Apache Camel 遇到新端点 URI 时,它都会创建一个新的端点实例。端点对象也是创建消费者端点和制作者端点的工厂。
端点必须实施 org.apache.camel.Endpoint 接口。Endpoint 接口定义了以下工厂方法:
-
createConsumer()
和createPollingConsumer()
abrt-活动Creates 是一个消费者端点,代表路由开头的源端点。 -
createProducer()
>_<-jaxbCreates a producer 端点,它代表路由末尾的目标端点。 -
createExchange()
InventoryService-jaxbCreates a exchange 对象,用于封装通过和关闭路由的消息。
消费者
消费者端点 会消耗 请求。它们始终显示在路由开始时,并封装负责接收传入请求和分配传出回复的代码。从面向服务的视角来看,使用者代表 服务。
消费者必须实施 org.apache.camel.Consumer 接口。在实施消费者时,可以遵循多种不同模式。这些模式在 第 38.1.3 节 “消费者模式和线程” 中所述。
producer
生产者端点 生成 请求。它们始终出现在路由的末尾,并封装负责分配传出请求和接收传入的回复的代码。从面向服务的视角来看,生产者代表服务消费者 。
制作者必须实施 org.apache.camel.Producer
接口。您可以选择实施制作者来支持异步处理方式。详情请查看 第 38.1.4 节 “异步处理”。
Exchange
Exchange 对象封装一组相关的消息。例如,一种消息交换是同步调用,它由请求消息和相关回复组成。
Exchanges 必须实施 org.apache.camel.Exchange 接口。默认的实现( DefaultExchange
)足以满足许多组件实施。但是,如果您要将额外的数据与交换相关联,或者交换了额外处理,那么自定义交换实施会很有用。
消息
- 在 message 这个过程中保留当前消息。
- out message>_<-abrttemporarily 包含回复信息。
所有消息类型都由同一 Java 对象 org.apache.camel.Message
来表示。并非始终需要自定义消息的实施,即 default Message
通常会足够。
38.1.2. 在路由中使用组件
概述
Apache Camel 路由基本上是一个处理器的管道,即 org.apache.camel.Processor 类型。消息封装在交换对象 E
中,通过调用 process()
方法从节点传递到节点。处理器管道的架构在 图 38.2 “Route 中的使用者和 Producer 实例” 中进行了说明。
图 38.2. Route 中的使用者和 Producer 实例
源端点
在路由开始时,您有源端点,由 org.apache.camel.Consumer
对象表示。源端点负责接受传入的请求消息并分配回复。在构建路由时,Apache Camel 根据端点 URI 的组件前缀创建适当的 Consumer
类型,如 第 38.1.1 节 “组件的工厂模式” 所述。
处理器
管道中的每个中间节点由处理器对象(实施 org.apache.camel.Processor 接口)表示。您可以插入标准处理器(例如,过滤
、throttler
或 delayer
)或者插入您自己的自定义处理器实施。
目标端点
路由的末尾是目标端点,由 org.apache.camel.Producer
对象代表。由于它源自处理器管道,因此制作者也是处理器对象(实施 org.apache.camel.Processor 接口)。目标端点负责发送传出请求消息并接收传入的回复。在构建路由时,Apache Camel 根据来自端点 URI 的组件前缀来创建适当的 Producer
类型。
38.1.3. 消费者模式和线程
概述
用于实施使用者的模式决定了处理传入交换时使用的线程模型。消费者可使用以下模式之一实施:
- 事件驱动的模式 abrt- the consumer 由外部线程驱动。
- 调度轮询模式 InventoryService ->_<The consumer is by a dedicated thread pool(一个专用的线程池)。
- Polling pattern InventoryService- the threading 模型保留为未定义状态。
事件驱动的模式
在事件驱动的模式中,当应用程序的另一个部分(通常是第三方库)调用消费者实施的方法时,发起传入请求的处理。事件驱动的消费者是一个很好的例子,即 Apache Camel JMX 组件,其中的事件由 JMX 库启动。JMX 库调用 handleNotification()
方法,以启动 requests processing>_<-abrt,有关详细信息,请参阅 例 41.4 “JMXConsumer 实施”
图 38.3 “event-Driven Consumer” 显示了事件驱动的消费者模式的概要。在本例中,假定调用 notify()
方法触发了处理。
图 38.3. event-Driven Consumer
事件驱动的消费者按如下方式处理传入请求:
使用者必须实施一种方法才能接收传入事件(在 图 38.3 “event-Driven Consumer” 中,这由
notify()
方法表示)。调用notify()
的线程通常是应用的独立部分,因此消费者的线程策略是外部的。例如,在 JMX 消费者实施的情况下,使用者实施
NotificationListener.handleNotification()
方法,以接收来自 JMX 的通知。驱动消费者处理的线程在 JMX 层内创建。-
在
notify()
方法的正文中,使用者首先将传入的事件转换为交换对象E
,然后在路由中的下一个处理器调用process()
,并将交换对象作为参数传递。
调度的轮询模式
在计划的轮询模式中,使用者通过定期检查间隔来检索传入的请求,无论请求是否到达。内置的计时器类( 调度的 executor 服务 )会自动检查请求,这是 java.util.concurrent 库提供的标准模式。调度的 executor 服务在固定间隔时执行特定的任务,还可管理一个线程池,用于运行任务实例。
图 38.4 “调度的 Poll Consumer” 显示计划的轮询消费者模式的概要。
图 38.4. 调度的 Poll Consumer
调度的轮询消费者处理传入请求,如下所示:
-
调度的 executor 服务有一个线程池,可用于启动消费者处理。在各个调度的时间间隔过后,调度的 executor 服务会尝试从池中获取可用线程(默认情况下,池中有五个线程)。如果可用线程可用,它将使用该线程在消费者上调用
poll()
方法。 -
使用者的
poll()
方法用于触发传入请求的处理。在poll()
方法的正文中,使用者会尝试检索传入的消息。如果没有请求可用,则poll()
方法会立即返回。 -
如果请求消息可用,则使用者将其插入到交换对象中,然后在路由中的下一个处理器调用
process()
,并将交换对象传递至其参数。
轮询模式
在轮询模式中,当第三方调用一个消费者轮询方法时,启动传入请求的处理:
-
receive()
-
receiveNoWait()
-
接收(长超时)
组件实施过程由组件实施来定义精确的机制,以发起对轮询方法的调用。这种机制在轮询模式中没有指定。
图 38.5 “polling Consumer” 显示了轮询消费者模式的概要。
图 38.5. polling Consumer
轮询消费者按如下方式处理传入请求:
- 每当称为消费者的轮询方法之一时,启动传入请求的处理。调用这些轮询方法的机制由组件实施定义。
在
receive()
方法的正文中,使用者会尝试检索传入的请求消息。如果目前没有可用的消息,则行为取决于调用哪个接收方法。-
receiveNoWait()
returns immediately -
接收(长超时)
等待指定的超时间隔[2] 在返回前 -
receive()
等待直到收到消息
-
-
如果请求消息可用,则使用者将其插入到交换对象中,然后在路由中的下一个处理器调用
process()
,并将交换对象传递至其参数。
38.1.4. 异步处理
概述
在处理交换时,制作者端点通常遵循 同步 模式。当制作者上的管道调用 process()
中的前面的处理器时,process()
方法将阻止,直到收到回复。在这种情况下,处理器的线程将会被阻止,直到制作者完成发送请求并接收回复的生命周期。
但有时候,您可能更喜欢将前面的处理器与制作者分离,从而使处理器的线程会立即释放,并且 process()
调用 不会阻止。在这种情况下,您应该使用 异步 模式实施制作者,为前面的处理器提供调用 process()
方法的非阻塞版本的选项。
为了让您了解不同的实现选项,本节描述了实施制作者端点的同步和异步模式。
同步制作者
图 38.6 “synchronous Producer” 显示同步制作者的概要,其中前面的处理器块直到制作者完成交换过程。
图 38.6. synchronous Producer
同步制作者会按如下方式处理交换:
-
管道中的前面的处理器调用制作者上的 synchronous
process()
方法,以启动同步处理。同步process()
方法采用单一交换参数。 -
在
process()
方法的正文中,生产者将请求(在消息中 )发送到端点。 -
如果交换模式需要,生产者会等待回复(出 消息)从端点到达。此步骤可能会导致
process()
方法无限期阻止。但是,如果交换模式没有强制回复,process()
方法可以在发送请求后立即返回。 -
当
process()
方法返回时,exchange 对象包含来自同步调用( Out 消息消息)的回复。
异步制作者
图 38.7 “asynchronous Producer” 展示了异步制作者的概要,其中生产者在子线程中处理交换,而前面的处理器在一定时间内不会被阻止。
图 38.7. asynchronous Producer
异步制作者以如下方式处理交换:
-
在处理器可以调用异步
process()
方法之前,它必须创建一个 异步回调 对象,该对象负责处理路由返回部分的交换。对于异步回调,处理器必须实施从 AsyncCallback 接口继承的类。 处理器调用制作者上的异步
process()
方法,以启动异步处理。异步process()
方法采用两个参数:- Exchange 对象
- 同步回调对象
-
在
process()
方法的正文中,制作者会创建一个可运行的
对象来封装处理代码。然后,制作者将这个 Runnable对象的
执行委托给子线程。 -
异步
process()
方法返回,从而释放处理器线程。交换处理将继续处于单独的子线程中。 -
Runnable
对象将 In 消息发送到端点。 -
如果交换模式需要,Run
nable
对象会等待回复(Out 或 Failure 消息)来到达端点。Runnable对象
会一直被阻止,直到收到回复为止。 -
在回复到达后,
Run
nable 对象会将回复(传出消息)插入到交换对象中,然后在异步回调对象上调用done()
。然后,异步回调会负责处理回复消息(在子线程中执行)。