2.8. 线程模型
Java 线程池 API 复制链接链接已复制到粘贴板!
Apache Camel 线程模型基于强大的 Java 并发 API Package java.util.concurrent,最初在 Sun 的 JDK 1.5 中提供。此 API 中的关键接口是 ExecutorService 接口,它代表线程池。使用并发 API,您可以创建许多不同的线程池,涵盖了广泛的场景。
Apache Camel 线程池 API 复制链接链接已复制到粘贴板!
Apache Camel 线程池 API 将基于 Java 并发 API 构建,为 Apache Camel 应用程序中的所有线程池提供中心工厂( org.apache.camel.spi.ExecutorServiceManager 类型)。以这种方式进行线程池的创建提供了几个优点,包括:
- 使用实用程序类简化线程池的创建。
- 将线程池与安全关闭集成.
- 线程会自动提供信息名称,这对记录和管理有益。
组件线程模型 复制链接链接已复制到粘贴板!
一些 Apache Camel 组件为 SEDA、JMS 和 Jettytty- inherent inherent多线程。这些组件已使用 Apache Camel 线程模型和线程池 API 实施。
如果您计划实施自己的 Apache Camel 组件,建议您将线程代码与 Apache Camel 线程模型集成。例如,如果您的组件需要线程池,建议您使用 CamelContext 的 ExecutorServiceManager 对象创建它。
处理器线程模型 复制链接链接已复制到粘贴板!
Apache Camel 中的一些标准处理器默认创建自己的线程池。这些线程感知型处理器也与 Apache Camel 线程模型集成,它们提供各种选项,供您自定义它们使用的线程池。
表 2.8 “处理器线程选项” 显示了在内置到 Apache Camel 的线程处理器上控制和设置线程池的各种选项。
| 处理器 | Java DSL | XML DSL |
|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
线程 DSL 选项 复制链接链接已复制到粘贴板!
线程 处理器是一种通用的 DSL 命令,可用于将线程池引入到路由中。它支持以下选项自定义线程池:
poolSize()- 池中的最小线程数量(以及初始池大小)。
maxPoolSize()- 池中的最大线程数量。
keepAliveTime()- 如果任何线程闲置的时间超过这个时间段(以秒为单位指定),则这些线程将被终止。
timeUnit()-
使用
java.util.concurrent.TimeUnit类型指定的时间单位。 maxQueueSize()- 此线程池可以在其传入任务队列中存储的最大待处理任务数量。
rejectedPolicy()- 指定在传入的任务队列已满时要执行的操作。请查看 表 2.10 “线程池构建器选项”
前面的线程池选项与 executorServiceRef 选项 不兼容 (例如,您不能使用这些选项覆盖 executorServiceRef 选项引用的线程池中的设置)。Apache Camel 验证 DSL 来执行此操作。
创建默认线程池 复制链接链接已复制到粘贴板!
要为一个线程感知处理器创建默认线程池,启用 parallelProcessing 选项,使用 parallelProcessing() 子使用、Java DSL 或 parallelProcessing 属性(在 XML DSL 中)。
例如,在 Java DSL 中,您可以使用默认线程池(使用线程池同时处理多播目的地)调用多播处理器:
from("direct:start")
.multicast().parallelProcessing()
.to("mock:first")
.to("mock:second")
.to("mock:third");
您可以在 XML DSL 中定义与以下方式相同的路由
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="direct:start"/>
<multicast parallelProcessing="true">
<to uri="mock:first"/>
<to uri="mock:second"/>
<to uri="mock:third"/>
</multicast>
</route>
</camelContext>
默认线程池配置集设置 复制链接链接已复制到粘贴板!
默认线程池由线程工厂自动创建,它从 默认的线程池配置集 获取其设置。默认线程池配置集在 表 2.9 “默认线程池配置集设置” 中显示的设置(假设这些设置还没有由应用程序代码修改)。
| 线程选项 | 默认值 |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
更改默认线程池配置集 复制链接链接已复制到粘贴板!
可以更改默认的线程池配置集设置,以便所有后续的默认线程池都使用自定义设置来创建。您可以在 Java 或 Spring XML 中更改配置集。
例如,在 Java DSL 中,您可以自定义 poolSize 选项和默认线程池配置集中的 maxQueueSize 选项,如下所示:
// Java
import org.apache.camel.spi.ExecutorServiceManager;
import org.apache.camel.spi.ThreadPoolProfile;
...
ExecutorServiceManager manager = context.getExecutorServiceManager();
ThreadPoolProfile defaultProfile = manager.getDefaultThreadPoolProfile();
// Now, customize the profile settings.
defaultProfile.setPoolSize(3);
defaultProfile.setMaxQueueSize(100);
...
在 XML DSL 中,您可以自定义默认的线程池配置集,如下所示:
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<threadPoolProfile
id="changedProfile"
defaultProfile="true"
poolSize="3"
maxQueueSize="100"/>
...
</camelContext>
请注意,在前面的 XML DSL 示例中将 defaultProfile 属性设置为 true 非常重要,否则线程池配置集将被视为自定义线程池配置集(请参阅 “创建自定义线程池配置集”一节),而不是替换默认的线程池配置集。
自定义处理器的线程池 复制链接链接已复制到粘贴板!
还可以使用 executorService 或 executorServiceRef 选项(其中使用这些选项而不是 parallelProcessing 选项)指定线程池。您可以使用两种方法来自定义处理器的线程池,如下所示:
-
指定自定义线程池 是否已创建
ExecutorService(线程池)实例,并将它传递到executorService选项。 -
指定自定义线程池配置集 InventoryService- responsiblecreate 并注册自定义线程池工厂。当您使用
executorServiceRef选项引用这个 factory 时,处理器会自动使用 factory 创建自定义线程池实例。
当您将 bean ID 传递给 executorServiceRef 选项时,线程识别处理器首先会尝试查找带有该 ID 在 registry 中的 ID 的自定义线程池。如果没有使用该 ID 注册了线程池,则处理器会尝试在 registry 中查找自定义线程池配置集,并使用自定义线程池配置集来实例化自定义线程池。
创建自定义线程池 复制链接链接已复制到粘贴板!
自定义线程池可以是 java.util.concurrent.ExecutorService 类型的任何线程池。在 Apache Camel 中,建议使用以下创建线程池实例的方法:
-
使用
org.apache.camel.builder.ThreadPoolBuilder实用程序构建线程池类。 -
使用来自当前
CamelContext的org.apache.camel.spi.ExecutorServiceManager实例来创建线程池类。
最后,这两种方法之间没有区别,因为 ThreadPoolBuilder 实际使用 ExecutorServiceManager 实例定义。通常,最好使用 ThreadPoolBuilder,因为它提供了一个更简单的方法。但是,至少有一种线程( ScheduledExecutorService)只能通过直接访问 ExecutorServiceManager 实例来创建。
表 2.10 “线程池构建器选项” 显示 ThreadPoolBuilder 类支持的选项,您可以在定义新的自定义线程池时设置这些选项。
| 构建器选项 | 描述 |
|---|---|
|
|
设置此线程池可以存储在其传入任务队列中的最大待处理任务数量。值 |
|
| 设置池中最小线程数量(这也是初始池大小)。默认值从默认的线程池配置集中提取。 |
|
| 设置池中可以的最大线程数量。默认值从默认的线程池配置集中提取。 |
|
| 如果任何线程闲置的时间超过这个时间段(以秒为单位指定),则这些线程将被终止。这样,当负载较正常时,线程池可以缩小。默认值从默认的线程池配置集中提取。 |
|
| 指定在传入的任务队列已满时要执行的操作。您可以指定 4 个可能值:
|
|
|
完成构建自定义线程池,并将新线程池注册到 |
在 Java DSL 中,您可以使用 ThreadPoolBuilder 定义自定义线程池,如下所示:
// Java
import org.apache.camel.builder.ThreadPoolBuilder;
import java.util.concurrent.ExecutorService;
...
ThreadPoolBuilder poolBuilder = new ThreadPoolBuilder(context);
ExecutorService customPool = poolBuilder.poolSize(5).maxPoolSize(5).maxQueueSize(100).build("customPool");
...
from("direct:start")
.multicast().executorService(customPool)
.to("mock:first")
.to("mock:second")
.to("mock:third");
您可以通过将 bean ID 传递给 executorServiceRef() 选项,而不必将对象引用 customPool 直接传递给 executorService() 选项,如下所示:
// Java
from("direct:start")
.multicast().executorServiceRef("customPool")
.to("mock:first")
.to("mock:second")
.to("mock:third");
在 XML DSL 中,您可以使用 threadPool 元素访问 ThreadPoolBuilder。然后,您可以使用 executorServiceRef 属性来引用自定义线程池,以便在 Spring registry 中根据 ID 查找线程池,如下所示:
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<threadPool id="customPool"
poolSize="5"
maxPoolSize="5"
maxQueueSize="100" />
<route>
<from uri="direct:start"/>
<multicast executorServiceRef="customPool">
<to uri="mock:first"/>
<to uri="mock:second"/>
<to uri="mock:third"/>
</multicast>
</route>
</camelContext>
创建自定义线程池配置集 复制链接链接已复制到粘贴板!
如果您有多个要创建的自定义线程池实例,您可能会发现它更加方便地定义自定义线程池配置集,该配置集充当线程池的工厂。每当您引用线程感知处理器的线程池配置集时,处理器会自动使用配置集来创建新的线程池实例。您可以在 Java DSL 或 XML DSL 中定义自定义线程池配置集。
例如,在 Java DSL 中,您可以使用 bean ID customProfile 并从路由中引用它,如下所示:
// Java
import org.apache.camel.spi.ThreadPoolProfile;
import org.apache.camel.impl.ThreadPoolProfileSupport;
...
// Create the custom thread pool profile
ThreadPoolProfile customProfile = new ThreadPoolProfileSupport("customProfile");
customProfile.setPoolSize(5);
customProfile.setMaxPoolSize(5);
customProfile.setMaxQueueSize(100);
context.getExecutorServiceManager().registerThreadPoolProfile(customProfile);
...
// Reference the custom thread pool profile in a route
from("direct:start")
.multicast().executorServiceRef("customProfile")
.to("mock:first")
.to("mock:second")
.to("mock:third");
在 XML DSL 中,使用 threadPoolProfile 元素创建自定义池配置集(其中,默认Profile 选项默认为 false,因为这 不是默认 的线程池配置集)。您可以使用 bean ID customProfile 创建自定义线程池配置集,并从路由中引用它,如下所示:
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<threadPoolProfile
id="customProfile"
poolSize="5"
maxPoolSize="5"
maxQueueSize="100" />
<route>
<from uri="direct:start"/>
<multicast executorServiceRef="customProfile">
<to uri="mock:first"/>
<to uri="mock:second"/>
<to uri="mock:third"/>
</multicast>
</route>
</camelContext>
在组件间共享线程池 复制链接链接已复制到粘贴板!
某些标准基于轮询的组件(如 File 和 FTP1028-jaxballow)要指定要使用的线程池。这使得不同组件可以共享同一线程池,从而减少了 JVM 中的总线程数量。
例如,Apache Camel 组件参考指南 中的 see File2 和 Apache Camel 组件参考指南 中的 Ftp2 均公开 scheduledExecutorService 属性,您可以使用它来指定组件的 ExecutorService 对象。
自定义线程名称 复制链接链接已复制到粘贴板!
要使应用程序日志更易阅读,通常最好自定义线程名称(用于识别日志中的线程)。要自定义线程名称,您可以通过在 ExecutorServiceStrategy 类或 ExecutorServiceManager 类上调用 setThreadNamePattern 方法来配置线程名称 模式。另外,设置线程名称模式的更简单方法是在 CamelContext 对象中设置 threadNamePattern 属性。
以下占位符可以在线程名称模式中使用:
#camelId#-
当前
CamelContext的名称。 #counter#- 唯一的线程标识符,作为递增计数器实施。
#name#- 常规 Camel 线程名称。
#longName#- 长线程名称 3.10.0-->_< 可以包含端点参数,以此类推。
以下是线程名称模式的典型示例:
Camel (#camelId#) thread #counter# - #name#
以下示例演示了如何使用 XML DSL 在 Camel 上下文上设置 threadNamePattern 属性:
<camelContext xmlns="http://camel.apache.org/schema/spring"
threadNamePattern="Riding the thread #counter#" >
<route>
<from uri="seda:start"/>
<to uri="log:result"/>
<to uri="mock:result"/>
</route>
</camelContext>