第 38 章 实现组件


摘要

本章介绍了可用于实施 Apache Camel 组件的方法的一般概述。

38.1. 组件架构

38.1.1. 组件的工厂模式

概述

Apache Camel 组件由一组通过工厂模式相互相关的类组成。组件的主要入口点是组件对象本身( org.apache.camel. Component 类型的实例)。您可以使用 组件 对象作为工厂创建 Endpoint 对象,后者作为创建 ConsumerProducerExchange 对象的工厂。这些关系总结在 图 38.1 “组件工厂模式”

图 38.1. 组件工厂模式

组件工厂模式

组件

组件实现是端点工厂。组件实施器的主要任务是实现 Component.createEndpoint () 方法,它负责按需创建新端点。

每种组件都必须与端点 URI 中显示的 组件前缀 关联。例如,文件组件通常与 文件 前缀关联,该前缀可在端点 URI 中使用,如 file://tmp/messages/input。在 Apache Camel 中安装新组件时,您必须定义特定组件前缀和实现组件的类名称之间的关联。

端点

每个端点实例封装特定的端点 URI。每次 Apache Camel 遇到新端点 URI 时,它会创建一个新的端点实例。端点对象也是创建消费者端点和制作者端点的工厂。

端点必须实施 org.apache.camel.Endpoint 接口。Endpoint 接口定义以下工厂方法:

  • createConsumer ()createPollingConsumer () ProductShortName-wagonCreates 是一个消费者端点,它代表路由开头的源端点。
  • createProducer () criu- iwlCreates a producer 端点,它代表路由末尾的目标端点。
  • createExchange () mvapich- iwlCreates 一个交换对象,它封装了路由传递和关闭的消息。

消费者

消费者端点 消耗 请求。它们始终显示在路由开始时,并封装负责接收传入请求并发送传出回复的代码。来自面向服务的潜在消费者代表一个 服务

消费者必须实施 org.apache.camel.Consumer 接口。在实施消费者时,您可以遵循多种不同的模式。这些模式在 第 38.1.3 节 “消费者模式和线程” 中进行了描述。

制作者

制作者端点 生成 请求。它们始终显示在路由的末尾,并封装负责分配传出请求并接收传入的回复的代码。来自面向服务的潜在生产者代表 服务消费者

生产者必须实施 org.apache.camel.Producer 接口。您可以选择实施制作者来支持异步处理。详情请查看 第 38.1.4 节 “异步处理”

Exchange

Exchange 对象封装一组相关的消息。例如,一种消息交换是同步调用,它由请求消息及其相关回复组成。

交换必须实施 org.apache.camel.Exchange 接口。默认的实现( DefaultExchange )足以满足许多组件实现的。但是,如果您想将额外的数据与交换相关联,或者具有交换预先处理,则自定义交换实施非常有用。

消息

Exchange 对象中有两个不同的消息插槽:

  • message iwl-wagonholds current message 中。
  • message iwl-wagontemporarily 包含回复消息。

所有消息类型都由相同的 Java 对象 org.apache.camel.Message 表示。通常并非始终需要自定义消息实现的 implementation implementation implementation DefaultMessage

38.1.2. 在路由中使用组件

概述

Apache Camel 路由基本上是 org.apache.camel.Processor 类型的处理器管道。消息封装在交换对象 E 中,后者通过调用 process () 方法从节点传递到节点。处理器管道的架构在 图 38.2 “路由中的消费者和 Producer 实例” 中显示。

图 38.2. 路由中的消费者和 Producer 实例

路由中的消费者和 Producer 实例

源端点

在路由开始时,您有源端点,该端点由 org.apache.camel.Consumer 对象表示。源端点负责接受传入的请求消息并发送回复。在构建路由时,Apache Camel 根据端点 URI 中的组件前缀创建适当的 Consumer 类型,如 第 38.1.1 节 “组件的工厂模式” 所述。

处理器

管道中的每个中间节点都由一个处理器对象表示(实施 org.apache.camel.Processor 接口)。您可以插入标准处理器(例如,过滤throttlerdelayer),或插入您自己的自定义处理器实现。

目标端点

在路由结束时,目标端点由 org.apache.camel.Producer 对象表示。由于它位于处理器管道的末尾,因此生产者也是处理器对象(实施 org.apache.camel.Processor 接口)。目标端点负责发送传出请求消息并接收传入的回复。在构建路由时,Apache Camel 根据端点 URI 中的组件前缀创建适当的 Producer 类型。

38.1.3. 消费者模式和线程

概述

用于实现消费者的模式决定了处理传入交换时使用的线程模型。可使用以下模式之一实施消费者:

事件驱动的模式

在事件驱动的模式中,当应用的另一个部分(通常是第三方库)调用由使用者实施的方法时,会启动传入请求的处理。事件驱动的消费者的一个很好的例子是 Apache Camel JMX 组件,其中由 JMX 库启动事件。JMX 库调用 handleNotification () 方法,来发起请求处理 TOTP-mvapichsee 例 41.4 “JMXConsumer 实现” 以了解详细信息。

图 38.3 “event-Driven Consumer” 显示事件驱动的消费者模式的概述。在本例中,假设处理是由调用 notify() 方法触发的。

图 38.3. event-Driven Consumer

使用事件驱动的消费者的消息链

事件驱动的消费者处理传入的请求,如下所示:

  1. 消费者必须实施一种方法来接收传入事件(在 图 38.3 “event-Driven Consumer” 中,这由 notify() 方法表示)。调用 通知() 的线程通常是应用的独立部分,因此消费者的线程策略是外部的。

    例如,如果实现 JMX 消费者,消费者实施 NotificationListener.handleNotification () 方法,以接收来自 JMX 的通知。驱动消费者处理的线程在 JMX 层内创建。

  2. notify()方法的 正文中,消费者首先将传入事件转换为交换对象 E,然后在路由中的下一个处理器上调用 process (),将 Exchange 对象作为其参数。

调度的轮询模式

在调度的轮询模式中,消费者定期检查请求是否到达,从而检索传入的请求。检查请求由内置计时器类( 调度的 executor 服务 )自动调度,后者是由 java.util.concurrent 库提供的标准模式。调度的 executor 服务以时间间隔执行特定的任务,它还管理一个线程池,用于运行任务实例。

图 38.4 “调度的 Poll Consumer” 显示计划的轮询消费者模式的概述。

图 38.4. 调度的 Poll Consumer

调度的 Poll Consumer

调度的轮询消费者处理传入的请求,如下所示:

  1. 调度的 executor 服务有一个线程池,可用于启动消费者处理。每次调度的时间间隔后,调度的 executor 服务会尝试从其池中获取空闲线程(默认为池中的五个线程)。如果有空闲线程可用,它会使用该线程调用消费者的 poll () 方法。
  2. 消费者的 poll () 方法旨在触发传入请求的处理。在 poll () 方法的正文中,消费者会尝试检索传入的消息。如果没有可用的请求,poll () 方法会立即返回。
  3. 如果请求消息可用,则消费者将其插入到交换对象中,然后在路由中的下一个处理器上调用 process (),将 exchange 对象作为其参数。

轮询模式

在轮询模式中,当第三方调用其中一个消费者轮询方法时,会启动传入请求的处理:

  • receive()
  • receiveNoWait()
  • receive (long timeout)

取决于组件实施,用于定义在轮询方法上启动调用的精确机制。这种机制不是由轮询模式指定。

图 38.5 “polling Consumer” 显示轮询消费者模式的概述。

图 38.5. polling Consumer

polling Consumer

轮询消费者处理传入的请求,如下所示:

  1. 每当调用其中一个消费者的轮询方法时,会启动传入请求的处理。调用这些轮询方法的机制已定义。
  2. receive () 方法的正文中,消费者尝试检索传入的请求消息。如果当前没有可用的消息,则行为取决于调用哪个接收方法。

    • receiveNoWait () 立即返回
    • receive (长超时) 等待指定的超时时间[2] 返回前
    • receive () 等待消息被接收
  3. 如果请求消息可用,则消费者将其插入到交换对象中,然后在路由中的下一个处理器上调用 process (),将 exchange 对象作为其参数。

38.1.4. 异步处理

概述

生产者端点通常在处理交换时遵循 同步 模式。当管道前面的处理器在生成者上调用 process () 时,process () 方法会阻止,直到收到回复为止。在这种情况下,处理器的线程会卡住,直到生成者完成发送请求并接收回复的周期。

但是,您可能更喜欢将前面的处理器与生成者分离,因此处理器的线程会立即释放,且 process () 调用 不会 阻断。在这种情况下,您应该使用 异步 模式实施制作者,这为前面的处理器提供调用 process () 方法的非阻塞版本的选项。

为了让您了解不同的实施选项,本节描述了实施制作者端点的同步和异步模式。

同步制作者

图 38.6 “同步 Producer” 显示同步制作者的概述,其中前面的处理器块直到生成者完成处理交换为止。

图 38.6. 同步 Producer

同步 Producer

同步制作者按如下方式处理交换:

  1. 管道中的前面的处理器调用制作者上的同步 process () 方法,以启动同步处理。同步 process () 方法采用单个交换参数。
  2. process () 方法的正文中,生成者将请求(In message)发送到端点。
  3. 如果交换模式需要,生成者会等待回复(传出 消息)到达端点。此步骤可能会导致 process () 方法无限期阻止。但是,如果交换模式不强制回复,则 process () 方法可以在发送请求后立即返回。
  4. process () 方法返回时,exchange 对象包含来自同步调用的回复(明确消息消息)。

异步制作者

图 38.7 “异步 Producer” 显示异步制作者的概述,其中生成者在子线程中处理交换,并且前面的处理器在任何显著长度没有被阻止。

图 38.7. 异步 Producer

异步 Producer

异步制作者按如下方式处理交换:

  1. 在处理器可以调用异步 process () 方法之前,它必须创建一个 异步回调 对象,该对象负责处理路由的返回部分的交换。对于异步回调,处理器必须实施从 AsyncCallback 接口继承的类。
  2. 处理器调用制作者上的异步 process () 方法,以启动异步处理。异步 process () 方法采用两个参数:

    • 一个交换对象
    • 同步回调对象
  3. process () 方法的正文中,生成者会创建一个 可运行的 对象来封装处理代码。然后,生成者会将此 Runnable 对象的 执行委托给子线程。
  4. 异步 process () 方法返回,从而释放处理器的线程。交换处理在单独的子线程中继续。
  5. Runnable 对象将 In 消息发送到端点。
  6. 如果交换模式需要,Runnable 对象会等待回复(OutFault 消息)到达端点。在收到回复前,Runnable 对象会一直被阻止。
  7. 在回复到达后,Run nable 对象将回复(Out 消息)插入到交换对象中,然后在异步回调对象上调用 done ()。然后,异步回调负责处理回复消息(在子线程中执行)。


[2] 超时间隔通常以毫秒为单位指定。
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.