2.8. 线程模型
Java 线程池 API
Apache Camel 线程模型基于强大的 Java 并发 API Package java.util.concurrent,最初在 Sun 的 JDK 1.5 中提供。此 API 中的关键接口是 ExecutorService
接口,它代表一个线程池。使用并发 API,您可以创建多种不同类型的线程池,涵盖广泛的场景。
Apache Camel 线程池 API
Apache Camel 线程池 API 基于 Java 并发 API 构建,方法是为 Apache Camel 应用程序中的所有线程池提供中央工厂( org.apache.camel.spi.ExecutorServiceManager
类型)。以这种方式集中创建线程池提供了几个优点,包括:
- 使用实用程序类简化线程池的创建。
- 将线程池与正常关闭集成.
- 线程会自动获得丰富的名称,这对于记录和管理很有帮助。
组件线程模型
一些 Apache Camel 组件在 multi-threaded 等,如 SEDA、JMS 和 Jetty PACKAGE-storageclassare 本质上是多线程。这些组件均使用 Apache Camel 线程模型和线程池 API 实施。
如果您计划实施自己的 Apache Camel 组件,建议您将线程代码与 Apache Camel 线程模型集成。例如,如果您的组件需要线程池,建议您使用 CamelContext 的 ExecutorServiceManager
对象创建它。
处理器线程模型
在 Apache Camel 中某些标准处理器默认创建了自己的线程池。这些线程感知处理器也与 Apache Camel 线程处理模型集成,它们提供了各种选项,供您自定义其使用的线程池。
表 2.8 “处理器线程选项” 显示各种选项,用于控制和设置线程感知处理器上内置到 Apache Camel 的线程池。
处理器 | Java DSL | XML DSL |
---|---|---|
|
parallelProcessing() executorService() executorServiceRef() |
@parallelProcessing @executorServiceRef |
|
parallelProcessing() executorService() executorServiceRef() |
@parallelProcessing @executorServiceRef |
|
parallelProcessing() executorService() executorServiceRef() |
@parallelProcessing @executorServiceRef |
|
parallelProcessing() executorService() executorServiceRef() |
@parallelProcessing @executorServiceRef |
|
executorService() executorServiceRef() poolSize() maxPoolSize() keepAliveTime() timeUnit() maxQueueSize() rejectedPolicy() |
@executorServiceRef @poolSize @maxPoolSize @keepAliveTime @timeUnit @maxQueueSize @rejectedPolicy |
|
wireTap(String uri, ExecutorService executorService) wireTap(String uri, String executorServiceRef) |
@executorServiceRef |
线程 DSL 选项
线程
处理器是一个通用的 DSL 命令,可用于将线程池引入到路由中。它支持以下选项来自定义线程池:
poolSize()
- 池中的最小线程数量(及初始池大小)。
maxPoolSize()
- 池中线程的最大数量。
keepAliveTime()
- 如果任何线程空闲的时间超过这个时间(以秒为单位指定),则终止它们。
timeUnit()
-
保留生存时间的时间,使用
java.util.concurrent.TimeUnit
类型进行指定。 maxQueueSize()
- 此线程池可以存储在其传入任务队列中的最大待处理任务数量。
rejectedPolicy()
- 指定传入任务队列已满时要执行的操作课程。请查看 表 2.10 “线程池构建器选项”
前面的线程池选项与 executorServiceRef
选项 不兼容 (例如:您无法使用这些选项来覆盖由 executorServiceRef
选项引用的线程池中的设置)。Apache Camel 验证 DSL 来执行此操作。
创建默认线程池
要为其中一个线程感知处理器创建默认线程池,启用 parallelProcessing
选项,使用 Java DSL 中的 parallelProcessing ()
子使用、在 XML DSL 中的 parallelProcessing 属性或 parallelProcessing
属性启用 parallelProcessing 选项。
例如,在 Java DSL 中,您可以使用默认的线程池(虚拟机监控程序同时处理多播目的地)调用多播处理器,如下所示:
from("direct:start") .multicast().parallelProcessing() .to("mock:first") .to("mock:second") .to("mock:third");
您可以在 XML DSL 中定义相同的路由,如下所示
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <multicast parallelProcessing="true"> <to uri="mock:first"/> <to uri="mock:second"/> <to uri="mock:third"/> </multicast> </route> </camelContext>
默认线程池配置集设置
默认线程池由线程工厂自动创建,从 默认的线程池配置文件 中获取其设置。默认线程池配置集在 表 2.9 “默认线程池配置集设置” 中显示设置(假设这些设置没有被应用程序代码修改)。
线程选项 | 默认值 |
---|---|
|
|
|
|
|
|
|
|
|
|
更改默认线程池配置集
可以更改默认线程池配置文件设置,这样将使用自定义设置创建后续的所有默认线程池。您可以在 Java 或 Spring XML 中更改配置集。
例如,在 Java DSL 中,您可以自定义默认线程池配置集中的 poolSize
选项和 maxQueueSize
选项,如下所示:
// Java import org.apache.camel.spi.ExecutorServiceManager; import org.apache.camel.spi.ThreadPoolProfile; ... ExecutorServiceManager manager = context.getExecutorServiceManager(); ThreadPoolProfile defaultProfile = manager.getDefaultThreadPoolProfile(); // Now, customize the profile settings. defaultProfile.setPoolSize(3); defaultProfile.setMaxQueueSize(100); ...
在 XML DSL 中,您可以自定义默认线程池配置集,如下所示:
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<threadPoolProfile
id="changedProfile"
defaultProfile="true"
poolSize="3"
maxQueueSize="100"/>
...
</camelContext>
请注意,在前面的 XML DSL 示例中将 defaultProfile
属性设置为 true
,否则线程池配置集将被视为自定义线程池配置文件(请参阅 “创建自定义线程池配置集”一节),而不是替换默认的线程池配置集。
自定义处理器的线程池
也可以使用 executorService
或 executorServiceRef
选项来直接为线程识别处理器指定线程池(其中使用了这些选项而不是 并行处理
选项)。您可以使用两种方法来自定义处理器的线程池,如下所示:
-
指定自定义线程池 WWN - insufficientexplicitly 创建
ExecutorService
(线程池)实例,并将它传递到executorService
选项。 -
指定自定义线程池配置集 TOKEN -create 并注册自定义线程池工厂。当您使用
executorServiceRef
选项引用此工厂时,处理器会自动使用 factory 来创建自定义线程池实例。
当您将 bean ID 传递给 executorServiceRef
选项时,线程感知处理器首先会尝试在 registry 中找到具有该 ID 的自定义线程池。如果没有使用该 ID 注册线程池,处理器将尝试在 registry 中查找自定义线程池配置集,并使用自定义线程池配置文件来实例化自定义线程池。
创建自定义线程池
自定义线程池可以是 java.util.concurrent.ExecutorService 类型的任何线程池。在 Apache Camel 中创建线程池实例的以下方法如下:
-
使用
org.apache.camel.builder.ThreadPoolBuilder
实用程序构建线程池类。 -
使用当前
CamelContext
中的org.apache.camel.spi.ExecutorServiceManager
实例来创建线程池类。
最终,这两种方法之间没有区别,因为 ThreadPoolBuilder
实际上使用 ExecutorServiceManager
实例进行定义。通常,使用 ThreadPoolBuilder
是首选的,因为它提供了更简单的方法。但是,至少有一个线程( ScheduledExecutorService
)仅可通过直接访问 ExecutorServiceManager
实例来创建。
表 2.10 “线程池构建器选项” 显示 ThreadPoolBuilder
类支持的选项,您可以在定义新的自定义线程池时设置这些选项。
构建器选项 | 描述 |
---|---|
|
设置此线程池可以在其传入任务队列中存储的最大待处理任务数量。值 |
| 设置池中最小线程数量(这也是初始池大小)。默认值取自默认的线程池配置集。 |
| 设置池中可以的最大线程数。默认值取自默认的线程池配置集。 |
| 如果任何线程空闲的时间超过这个时间(以秒为单位指定),则终止它们。这允许线程池在负载较轻时缩小。默认值取自默认的线程池配置集。 |
| 指定传入任务队列已满时要执行的操作课程。您可以指定四个可能的值:
|
|
完成构建自定义线程池,并在指定为 |
在 Java DSL 中,您可以使用 ThreadPoolBuilder
定义自定义线程池,如下所示:
// Java import org.apache.camel.builder.ThreadPoolBuilder; import java.util.concurrent.ExecutorService; ... ThreadPoolBuilder poolBuilder = new ThreadPoolBuilder(context); ExecutorService customPool = poolBuilder.poolSize(5).maxPoolSize(5).maxQueueSize(100).build("customPool"); ... from("direct:start") .multicast().executorService(customPool) .to("mock:first") .to("mock:second") .to("mock:third");
除了直接将 beutorService ()选项直接传递给 executorService ()
选项,您可以查询 registry 中的线程池,而是将其 bean ID 传递给 executorServiceRef ()
选项,如下所示:
// Java from("direct:start") .multicast().executorServiceRef("customPool") .to("mock:first") .to("mock:second") .to("mock:third");
在 XML DSL 中,您可以使用 threadPool
元素访问 ThreadPoolBuilder
。然后,您可以使用 executorServiceRef
属性引用自定义线程池,根据 Spring registry 中的 ID 查找线程池,如下所示:
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <threadPool id="customPool" poolSize="5" maxPoolSize="5" maxQueueSize="100" /> <route> <from uri="direct:start"/> <multicast executorServiceRef="customPool"> <to uri="mock:first"/> <to uri="mock:second"/> <to uri="mock:third"/> </multicast> </route> </camelContext>
创建自定义线程池配置集
如果您有多个要创建的自定义线程池实例,您可能会更方便地定义自定义线程池配置文件,它充当线程池的工厂。每当您从线程感知处理器引用线程池配置集时,处理器会自动使用该配置集来创建新的线程池实例。您可以在 Java DSL 或 XML DSL 中定义自定义线程池配置集。
例如,在 Java DSL 中,您可以使用 bean ID、CustomProfile
并在路由中引用它,如下所示:
// Java import org.apache.camel.spi.ThreadPoolProfile; import org.apache.camel.impl.ThreadPoolProfileSupport; ... // Create the custom thread pool profile ThreadPoolProfile customProfile = new ThreadPoolProfileSupport("customProfile"); customProfile.setPoolSize(5); customProfile.setMaxPoolSize(5); customProfile.setMaxQueueSize(100); context.getExecutorServiceManager().registerThreadPoolProfile(customProfile); ... // Reference the custom thread pool profile in a route from("direct:start") .multicast().executorServiceRef("customProfile") .to("mock:first") .to("mock:second") .to("mock:third");
在 XML DSL 中,使用 threadPoolProfile
元素创建一个自定义池配置文件(您可让 defaultProfile
选项默认为 false
),因为这不是默认 的线程池配置文件。您可以使用 bean ID、customProfile
并在路由中引用它,如下所示:
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <threadPoolProfile id="customProfile" poolSize="5" maxPoolSize="5" maxQueueSize="100" /> <route> <from uri="direct:start"/> <multicast executorServiceRef="customProfile"> <to uri="mock:first"/> <to uri="mock:second"/> <to uri="mock:third"/> </multicast> </route> </camelContext>
在组件间共享线程池
某些标准的 poll-based 组件的灵活性和敏捷性,如 File 和 FTPTOKEN-timer,允许您指定要使用的线程池。这使得不同的组件可以共享同一线程池,从而降低 JVM 中的线程总数。
例如,Apache Camel 组件参考指南 . 和 Apache Camel 组件 参考指南 中的 Ftp2 包括了 调度的ExecutorService
属性,您可以使用它来指定组件的 ExecutorService
对象。https://access.redhat.com/documentation/en-us/red_hat_fuse/7.10/html-single/apache_camel_component_reference/index#file-component
自定义线程名称
要使应用日志更易阅读,通常最好自定义线程名称(用于识别日志中的线程)。要自定义线程名称,您可以在 ExecutorServiceStrategy
类或 ExecutorServiceManager
类上调用 setThreadNamePattern
方法来配置线程名称 模式。或者,设置线程名称模式的一种简单方法是设置 CamelContext
对象上的 threadNamePattern
属性。
以下占位符可以在线程名称模式中使用:
#camelId#
-
当前
CamelContext
的名称。 #counter#
- 唯一的线程标识符,作为递增计数器实施。
#name#
- 常规 Camel 线程名称。
#longName#
- 较长的线程名称在相同时间上可以包括端点参数,以此类推。
以下是线程名称模式的典型示例:
Camel (#camelId#) thread #counter# - #name#
以下示例演示了如何使用 XML DSL 在 Camel 上下文上设置 threadNamePattern
属性:
<camelContext xmlns="http://camel.apache.org/schema/spring"
threadNamePattern="Riding the thread #counter#" >
<route>
<from uri="seda:start"/>
<to uri="log:result"/>
<to uri="mock:result"/>
</route>
</camelContext>