第 38 章 实施组件
摘要
本章概述了可用于实施 Apache Camel 组件的方法。
38.1. 组件架构
38.1.1. 组件的工厂模式
概述
Apache Camel 组件由一组通过工厂模式相互相关的类组成。组件的主要入口点是 components 对象本身(一个 org.apache.camel.Component
类型)实例。您可以将
Component
对象用作工厂创建 Endpoint
对象,后者又充当创建 消费者
、生产者
和 Exchange
Exchange 对象的工厂。这些关系在 中进行了概述 图 38.1 “组件因素模式”
图 38.1. 组件因素模式
组件
组件实现是端点工厂。组件实现器的主要任务是实施 component .createEndpoint ()
方法,它负责按需创建新端点。
每种组件必须与端点 URI 中显示的组件前缀 关联。例如,文件组件通常与 文件 前缀相关联,前缀可在类似 file://tmp/messages/input 的端点 URI 中使用。在 Apache Camel 中安装新组件时,您必须定义特定组件前缀和实施组件的类名称之间的关联。
端点
每个端点实例封装了一个特定的端点 URI。每次 Apache Camel 遇到新端点 URI 时,它都会创建一个新的端点实例。端点对象也是创建消费者端点和制作者端点的工厂。
端点必须实施 org.apache.camel.Endpoint 接口。Endpoint 接口定义了以下工厂方法:
-
createConsumer ()
和createPollingConsumer ()
Memcached-»Creates consumer 端点,它代表路由开头的源端点。 -
createProducer ()
方式为 producer 端点,它代表路由末尾的目标端点。 -
createExchange ()
CURRENTCreates 是一个交换对象,它封装了传递和关闭路由的消息。
消费者
消费者端点 使用 请求。它们始终显示在路由开始时,它们封装了负责接收传入请求并发送传出回复的代码。从面向服务的视角来看,消费者表示 服务。
消费者必须实施 org.apache.camel.Consumer 接口。在实施消费者时,您可以遵循许多不同的模式。第 38.1.3 节 “消费者模式和线程” 描述了这些模式。
制作者
制作者端点 生成 请求。它们始终出现在路由的末尾,它们封装代码负责发送传出请求并接收传入的回复。从面向服务的视角来看,生产者代表 服务使用者。
生产者必须实施 org.apache.camel.Producer
接口。您可以选择实施制作者以支持异步处理方式。详情请查看 第 38.1.4 节 “异步处理”。
Exchange
交换对象封装一组相关的消息。例如,一种消息交换是同步调用,它由请求消息及其相关回复组成。
Exchanges 必须实施 org.apache.camel.Exchange 接口。默认实施 DefaultExchange
足以满足许多组件实现。但是,如果要与交换关联额外数据,或者交换了额外的处理,那么自定义交换实施会很有用。
消息
- 在 messagetimer-sandboxed 中保留当前的消息。
- out message-2021-33-的temporaritaly 包含回复消息。
所有消息类型都由同一 Java 对象 org.apache.camel.Message
表示。自定义消息 implementation将默认实现 DefaultMessage
认为是不够的,并非始终是必要的。
38.1.2. 在 Route 中使用组件
概述
Apache Camel 路由基本上是一个处理器管道,即 org.apache.camel.Processor 类型。消息封装在一个交换对象 E
中,通过调用 process ()
方法从节点传递到节点。处理器管道的架构在 图 38.2 “路由中的使用者和生产者实例” 中进行了说明。
图 38.2. 路由中的使用者和生产者实例
源端点
在路由开始时,您具有源端点,该端点由 org.apache.camel.Consumer
对象表示。源端点负责接受传入请求消息并分配回复。在构建路由时,Apache Camel 根据端点 URI 中的组件前缀创建适当的 Consumer
类型,如 第 38.1.1 节 “组件的工厂模式” 所述。
处理器
管道中的每个中间节点都由一个处理器对象(实施 org.apache.camel.Processor 接口)表示。您可以插入标准处理器(例如,过滤
、throttler
或 delayer
)或插入您自己的自定义处理器实现。
目标端点
路由末尾是目标端点,它由 org.apache.camel.Producer
对象表示。由于它位于处理器管道的末尾,因此制作者也是处理器对象(实施 org.apache.camel.Processor 接口)。目标端点负责发送传出请求消息并接收传入的回复。在构建路由时,Apache Camel 根据端点 URI 中的组件前缀创建适当的 Producer
类型。
38.1.3. 消费者模式和线程
概述
用于实施使用者的模式决定了处理传入交换时使用的线程模型。消费者可使用以下模式之一实现:
- event-driven pattern TOKEN-sandboxed The consumer 由外部线程驱动。
- 调度的 poll 模式 TOKEN-将消费者由专用线程池驱动。
- Polyperation pattern 记录器 - threading model is left undefined。
事件驱动的模式
在事件驱动模式中,当应用的另一部分(通常是第三方库)的另一部分时,会启动传入请求的处理,调用使用者实施的方法。事件驱动使用者是一个很好的示例,即 Apache Camel JMX 组件,其中事件由 JMX 库启动。JMX 库调用 handleNotification ()
方法来发起请求处理将 processing processing-clusterSelector 查看详细信息,请参阅 例 41.4 “JMXConsumer 实施”。
图 38.3 “event-Driven Consumer” 显示事件驱动的消费者模式的概述。在本例中,假定处理是由 notify()
方法的调用触发。
图 38.3. event-Driven Consumer
事件驱动的消费者处理传入的请求,如下所示:
消费者必须实施一种方法来接收传入的事件(在 图 38.3 “event-Driven Consumer” 中,这由
notify()
方法表示)。调用notify()
的线程通常是应用的一个单独部分,因此消费者的线程是外部驱动的。例如,在 JMX 消费者实施时,消费者实施
NotificationListener.handleNotification ()
方法,从 JMX 接收通知。驱动消费者处理的线程在 JMX 层内创建。-
在
notify()
方法的正文中,使用者首先将传入的事件转换为交换对象E
,然后在路由中的下一个处理器上调用process ()
,并将 Exchange 对象作为其参数。
调度的轮询模式
在调度的轮询模式中,消费者通过定期检查请求是否到达,从而检索传入请求。检查请求由内置计时器类、计划的 executor 服务自动调度,这是 java.util.concurrent 库提供的标准模式。调度的 executor 服务以定时间隔执行特定的任务,还管理一组用于运行任务实例的线程池。
图 38.4 “调度的轮询消费者” 显示调度的轮询消费者模式的概要。
图 38.4. 调度的轮询消费者
调度的轮询消费者处理传入请求,如下所示:
-
调度的 executor 服务有一组线程池,可用于启动消费者处理。在生成了每个调度的时间间隔后,调度的 executor 服务会尝试从池中获取空闲线程(默认为五个线程)。如果有可用线程,它将使用该线程来调用使用者上的
poll ()
方法。 -
消费者的
poll ()
方法旨在触发传入请求的处理。在poll ()
方法的正文中,使用者会尝试检索传入的消息。如果没有请求可用,则poll ()
方法会立即返回。 -
如果请求消息可用,使用者将其插入到交换对象中,然后在路由中的下一个处理器上调用
process ()
,将 Exchange 对象作为参数传递。
轮询模式
在轮询模式中,当第三方调用其中一个消费者轮询方法时,启动传入请求的处理:
-
receive()
-
receiveNoWait()
-
receive(long timeout)
这些组件的实施是用来定义启动轮询方法调用的精确机制。这种机制在轮询模式中没有指定。
图 38.5 “轮询消费者” 显示轮询消费者模式的概述。
图 38.5. 轮询消费者
轮询使用者处理传入的请求,如下所示:
- 每当调用其中一个消费者的轮询方法时,都会启动传入请求的处理。调用这些轮询方法的机制由组件实施定义。
在
receive ()
方法的正文中,使用者会尝试检索传入请求消息。如果目前没有消息可用,则行为将取决于调用哪个接收方法。-
receiveNoWait()
returns immediately -
receive (长超时)
等待指定的超时间隔[2] 在返回前 -
receive ()
等待收到消息
-
-
如果请求消息可用,使用者将其插入到交换对象中,然后在路由中的下一个处理器上调用
process ()
,将 Exchange 对象作为参数传递。
38.1.4. 异步处理
概述
制作者端点在处理交换时通常遵循 同步 模式。当管道中的上述处理器在制作者上调用 process ()
时,process ()
方法会阻止,直到收到回复为止。在这种情况下,处理器的线程会保留阻止,直到制作者完成发送请求并接收回复的周期。
但有时候,您可能更倾向于将上述处理器与生产者分离,以便立即释放处理器的线程,而 process ()
调用也 不会阻止。在这种情况下,您应该使用 异步 模式实施制作者,这为前一个处理器提供了调用 进程()
方法的非阻塞版本的选项。
为了让您了解不同的实施选项,本节将同时描述实施制作者端点的同步和异步模式。
同步制作者
图 38.6 “同步生产者” 显示同步制作者的概述,其中前面的处理器块直到生产者完成交换完成。
图 38.6. 同步生产者
同步制作者会按如下方式处理交换:
-
管道中的前面的处理器调用制作者上的 synchronous
process ()
方法来启动同步处理。同步process ()
方法采用单一交换参数。 -
在
process ()
方法的正文中,生产者将请求(消息)发送到端点。 -
如果交换模式要求,则制作者会等待回复(Out message)从端点到达。此步骤可能会导致
process ()
方法无限期阻止。但是,如果交换模式不强制回复,则process ()
方法可以在发送请求后立即返回。 -
当
process ()
方法返回时,交换对象包含来自同步调用(an Out message)的回复。
异步制作者
图 38.7 “异步 Producer” 显示异步制作者的概述,生产者在子线程中处理交换,并且上述处理器不会因任何大量时间而阻止。
图 38.7. 异步 Producer
异步制作者处理交换,如下所示:
-
在处理器可以调用异步
process ()
方法之前,它必须创建一个 异步回调 对象,该对象负责处理路由返回部分的交换。对于异步回调,处理器必须实施从 AsyncCallback 接口继承的类。 处理器调用制作者上的异步
process ()
方法来启动异步处理。异步process ()
方法采用两个参数:- Exchange 对象
- 同步回调对象
-
在
process ()
方法的正文中,生产者会创建一个可运行的对象,用于封装处理代码。然后,生产者会将此
Runnable
对象的执行委托给一个子线程。 -
异步
process ()
方法返回,从而释放处理器的线程。交换处理将继续处于单独的子线程。 -
Runnable
对象将 In 消息发送到端点。 -
如果交换模式需要,
可运行的
对象会等待回复(Out 或 Fault 消息)到达端点。在收到回复前,可运行的对象会保持阻止。 -
在回复到达后,
可运行的
对象将回复(Out 消息)插入到交换对象中,然后在异步回调对象上调用done ()
。然后,异步回调负责处理回复消息(在子线程中执行)。