第 38 章 实施组件


摘要

本章介绍了可用于实施 Apache Camel 组件的方法的一般概述。

38.1. 组件架构

38.1.1. 组件工厂模式

概述

Apache Camel 组件由一组通过工厂模式相互相关的类组成。主入口点指向组件对象本身( org.apache.camel. Component 类型的实例 )。您可以使用 组件 对象作为工厂创建 Endpoint 对象,后者充当创建 消费者ProducerExchange 对象的因素。这些关系总结在 图 38.1 “组件工厂模式”

图 38.1. 组件工厂模式

组件工厂模式

组件

组件实现是一个端点工厂。组件实施者的主要任务是实施 Component.createEndpoint () 方法,它负责根据需要创建新端点。

每个组件都必须与端点 URI 中显示的 组件前缀 关联。例如,文件组件通常与 文件 前缀关联,该前缀可在 file://tmp/messages/input 等端点 URI 中使用。在 Apache Camel 中安装新组件时,您必须定义特定组件前缀和实施组件的类名称之间的关联。

端点

每个端点实例封装特定的端点 URI。每次 Apache Camel 遇到新端点 URI 时,它会创建一个新的端点实例。端点对象也是创建消费者端点和制作者端点的工厂。

端点必须实施 org.apache.camel.Endpoint 接口。Endpoint 接口定义以下工厂方法:

  • createConsumer ()createPollingConsumer () HEKETI-wagonCreates 是一个消费者端点,它代表路由开头的源端点。
  • createProducer () 5-4Creates a producer 端点,它代表路由末尾的目标端点。
  • createExchange () HEKETI-wagonCreates 是一个交换对象,它封装了传递和关闭路由的消息。

消费者

消费者端点 使用 请求。它们始终出现在路由的开头,它们封装负责接收传入请求和分配传出回复的代码。从面向服务的角度来说,使用者代表 服务

消费者必须实施 org.apache.camel.Consumer 接口。在实施消费者时,您可以遵循许多不同的模式。这些模式在 第 38.1.3 节 “消费者模式和线程” 中进行了描述。

制作者

生产者端点 生成 请求。它们始终出现在路由的末尾,它们封装负责分配传出请求并接收传入回复的代码。从面向服务的视角中,生成者代表 服务消费者

生产者必须实施 org.apache.camel.Producer 接口。您可以选择实施制作者,以支持异步处理风格。详情请查看 第 38.1.4 节 “异步处理”

Exchange

Exchange 对象封装一组相关的消息。例如,一种消息交换是同步调用,由请求消息及其相关回复组成。

交换必须实施 org.apache.camel.Exchange 接口。默认实施 DefaultExchange 足以满足许多组件实现的情况。但是,如果您想要将额外的数据与交换相关联,或者有交换前进行额外的处理,那么自定义交换实施会很有用。

消息

Exchange 对象中有两个不同的消息插槽:

  • 在消息 swig-Cloneholds the current 信息中。
  • out message swig-wagontemporarily 包含回复信息。

所有消息类型都由同一 Java 对象 org.apache.camel.Message 表示。并不总是需要自定义消息实现 swig-wagonthe default 实现 DefaultMessage,通常就足够了。

38.1.2. 在路由中使用组件

概述

Apache Camel 路由本质上是 org.apache.camel.Processor 类型的处理器管道。消息封装在交换对象 E 中,通过调用 process () 方法从节点传递到节点。处理器管道的架构在 图 38.2 “路由中的消费者和生成实例” 中进行了说明。

图 38.2. 路由中的消费者和生成实例

路由中的消费者和 Producer 实例

源端点

在路由开始时,您有源端点,该端点由 org.apache.camel.Consumer 对象表示。源端点负责接受传入请求消息并分配回复。在构建路由时,Apache Camel 根据端点 URI 的组件前缀创建适当的 Consumer 类型,如 第 38.1.1 节 “组件工厂模式” 所述。

Processors

管道中的每个中间节点都由处理器对象(实现 org.apache.camel.Processor 接口)表示。您可以插入标准处理器(例如,过滤throttlerdelayer),或者插入您自己的自定义处理器实现。

目标端点

在路由的末尾,目标端点由 org.apache.camel.Producer 对象表示。由于它位于处理器管道的末尾,因此生成者也是处理器对象(实施 org.apache.camel.Processor 接口)。目标端点负责发送传出请求消息并接收传入的回复。在构建路由时,Apache Camel 根据端点 URI 的组件前缀创建适当的 Producer 类型。

38.1.3. 消费者模式和线程

概述

用于实现使用者的模式决定了处理传入交换时使用的线程模型。消费者可使用以下模式之一实施:

事件驱动的模式

在事件驱动的模式中,当应用的另一部分(通常是第三方库)调用由消费者实施的方法时,会启动传入请求的处理。事件驱动的消费者的一个很好的例子是 Apache Camel JMX 组件,其中事件由 JMX 库启动。JMX 库调用 handleNotification () 方法来发起请求处理 processing swig-wagon 请参见 例 41.4 “JMXConsumer 实现” 以了解详细信息。

图 38.3 “event-Driven Consumer” 显示事件驱动的消费者模式的概要。在本例中,假设处理是由对 notify() 方法的调用触发的。

图 38.3. event-Driven Consumer

使用事件驱动的消费者的消息队列

事件驱动的消费者处理传入的请求,如下所示:

  1. 消费者必须实施接收传入事件的方法(在 图 38.3 “event-Driven Consumer” 中,由 notify() 方法表示)。调用 notify() 的线程通常是应用的独立部分,因此消费者的线程策略是外部驱动的。

    例如,如果是 JMX 消费者实施,使用者实施 NotificationListener.handleNotification () 方法,以便从 JMX 接收通知。驱动消费者处理的线程是在 JMX 层中创建的。

  2. notify() 方法的正文中,消费者首先将传入的事件转换为交换对象 E,然后在路由中的下一个处理器上调用 process (),并将交换对象作为参数传递。

调度的轮询模式

在调度的轮询模式中,使用者定期检查请求是否到达,从而检索传入的请求。检查请求由内置的计时器类(计划的 executor 服务)自动调度,该服务 是由 java.util.concurrent 库提供的标准模式。调度的 executor 服务以时段执行特定的任务,并且管理一个用于运行任务实例的线程池。

图 38.4 “Scheduled Poll Consumer” 显示调度的轮询消费者模式的概要。

图 38.4. Scheduled Poll Consumer

Scheduled Poll Consumer

调度的轮询消费者处理传入的请求,如下所示:

  1. 调度的 executor 服务有一个线程池,可用于启动消费者处理。在各个调度的时间间隔被过后,调度的 executor 服务会尝试从池中获取可用线程(默认为池中的五个线程)。如果有可用的线程可用,它会使用该线程调用消费者的 poll () 方法。
  2. 消费者的 poll () 方法旨在触发传入请求的处理。在 poll () 方法的正文中,使用者会尝试检索传入的消息。如果没有可用的请求,poll () 方法会立即返回。
  3. 如果请求消息可用,则消费者将其插入到交换对象中,然后在路由中的下一个处理器上调用 process (),并将交换对象作为其参数。

轮询模式

在轮询模式中,当第三方调用消费者的轮询方法之一时,启动传入请求的处理:

  • receive()
  • receiveNoWait()
  • receive (长超时)

组件实施最多用于定义在轮询方法上启动调用的确切机制。这种机制不在轮询模式中指定。

图 38.5 “polling Consumer” 显示轮询消费者模式的概述。

图 38.5. polling Consumer

polling Consumer

轮询消费者处理传入的请求,如下所示:

  1. 每当调用使用者轮询方法之一时,都会启动传入请求的处理。调用这些轮询方法的机制由组件实施定义。
  2. receive () 方法的正文中,使用者会尝试检索传入的请求消息。如果当前没有消息,则行为取决于调用哪个接收方法。

    • receiveNoWait () 立即返回
    • receive (长超时) 等待指定的超时间隔[2] 返回前
    • receive () 等待收到消息
  3. 如果请求消息可用,则消费者将其插入到交换对象中,然后在路由中的下一个处理器上调用 process (),并将交换对象作为其参数。

38.1.4. 异步处理

概述

在处理交换时,生成者端点通常遵循 同步 模式。当生成者上的管道调用 process () 中的上述处理器时,process () 方法会阻止,直至收到回复。在这种情况下,处理器的线程会被阻断,直到生成者完成发送请求和接收回复的周期。

但是,有时您可能希望将前面的处理器与生成者分离,因此处理器的线程会立即释放,并且 process () 调用 不会 阻断。在这种情况下,您应该使用 异步 模式实施制作者,这为前面的处理器提供调用 process () 方法的非阻塞版本的选项。

为了让您了解不同的实施选项,本节描述了用于实施制作者端点的同步和异步模式。

同步制作者

图 38.6 “同步 Producer” 显示同步制作者的概述,其中前面的处理器块直到生产者完成交换为止。

图 38.6. 同步 Producer

同步 Producer

同步制作者按如下方式处理交换:

  1. 管道中的前面的处理器调用制作者上的 同步进程 () 方法来启动同步处理。同步 process () 方法采用单个交换参数。
  2. process () 方法的正文中,生成者将请求(消息)发送到端点。
  3. 如果交换模式需要,生成者会等待回复(Out 消息)到达端点。此步骤可能会导致 process () 方法无限期阻止。但是,如果交换模式不强制回复,则 process () 方法可以在发送请求后立即返回。
  4. process () 方法返回时,交换对象包含来自同步调用的回复( Out 消息消息)。

异步制作者

图 38.7 “异步生成” 显示了异步制作者的概要,其中生成者在子线程中处理交换,并且前面的处理器不会因任何显著时长而被阻止。

图 38.7. 异步生成

异步生成

异步制作者按如下方式处理交换:

  1. 在处理器可以调用异步 process () 方法之前,它必须创建一个 异步回调 对象,该对象负责处理路由的返回部分的交换。对于异步回调,处理器必须实施从 AsyncCallback 接口继承的类。
  2. 处理器调用制作者上的异步 process () 方法,以启动异步处理。异步 process () 方法采用两个参数:

    • 交换对象
    • 同步回调对象
  3. process () 方法的正文中,生成者会创建一个可封装处理代码的 可运行 对象。然后,生产者将此 可运行 对象的执行委派给子线程。
  4. 异步 process () 方法返回,从而释放处理器的线程。交换处理将继续在不同的子线程中。
  5. Runnable 对象将 In 消息发送到端点。
  6. 如果交换模式需要,Runnable 对象会等待回复(OutFault 消息)到达端点。Runnable 对象 会被阻断,直到收到回复为止。
  7. 回复到达后,Run nable 对象会将回复(Out 消息)插入到交换对象中,然后在异步回调对象上调用 done ()。然后,异步回调负责处理回复消息(在子线程中执行)。


[2] 超时间隔通常以毫秒为单位指定。
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.