Apache Camel 线程模型基于强大的 Java 并发 API (package java.util.concurrent ),它最初在 Sun 的 JDK 1.5 中提供。此 API 中的关键接口是 ExecutorService
接口,代表线程池。使用 concurrency API,您可以创建许多不同类型的线程池,覆盖各种场景。
Apache Camel 线程池 API 通过为 Apache Camel 应用程序中的所有线程池提供中央工厂( org.apache.camel.spi.ExecutorServiceManager
类型)。以这种方式创建线程池提供了几个优点,包括:
-
使用实用程序类简化线程池的创建。
-
将线程池与安全关闭集成.
-
线程自动给定信息名称,这对于日志记录和管理很有用。
有些 Apache Camel 组件 swig-wagon (如 SEDA、JMS 和 Jetty)都包括在多线程中。这些组件已使用 Apache Camel 线程模型和线程池 API 实施。
如果您计划实施自己的 Apache Camel 组件,建议您将线程代码与 Apache Camel 线程模型集成。例如,如果您的组件需要线程池,建议您使用 CamelContext 的 ExecutorServiceManager
对象创建它。
默认情况下,Apache Camel 中的一些标准处理器创建自己的线程池。这些线程感知处理器也与 Apache Camel 线程模型集成,它们提供了各种选项,供您自定义它们使用的线程池。
表 2.8 “处理器线程选项” 显示在线程感知处理器内置到 Apache Camel 上控制和设置线程池的各种选项。
Expand表 2.8. 处理器线程选项处理器 | Java DSL | XML DSL |
---|
聚合
|
parallelProcessing()
executorService()
executorServiceRef()
parallelProcessing()
executorService()
executorServiceRef()
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
|
@parallelProcessing
@executorServiceRef
@parallelProcessing
@executorServiceRef
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
|
multicast
|
parallelProcessing()
executorService()
executorServiceRef()
parallelProcessing()
executorService()
executorServiceRef()
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
|
@parallelProcessing
@executorServiceRef
@parallelProcessing
@executorServiceRef
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
|
recipientList
|
parallelProcessing()
executorService()
executorServiceRef()
parallelProcessing()
executorService()
executorServiceRef()
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
|
@parallelProcessing
@executorServiceRef
@parallelProcessing
@executorServiceRef
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
|
split
|
parallelProcessing()
executorService()
executorServiceRef()
parallelProcessing()
executorService()
executorServiceRef()
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
|
@parallelProcessing
@executorServiceRef
@parallelProcessing
@executorServiceRef
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
|
threads
|
executorService()
executorServiceRef()
poolSize()
maxPoolSize()
keepAliveTime()
timeUnit()
maxQueueSize()
rejectedPolicy()
executorService()
executorServiceRef()
poolSize()
maxPoolSize()
keepAliveTime()
timeUnit()
maxQueueSize()
rejectedPolicy()
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
|
@executorServiceRef
@poolSize
@maxPoolSize
@keepAliveTime
@timeUnit
@maxQueueSize
@rejectedPolicy
@executorServiceRef
@poolSize
@maxPoolSize
@keepAliveTime
@timeUnit
@maxQueueSize
@rejectedPolicy
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
|
wireTap
|
wireTap(String uri, ExecutorService executorService)
wireTap(String uri, String executorServiceRef)
wireTap(String uri, ExecutorService executorService)
wireTap(String uri, String executorServiceRef)
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
|
@executorServiceRef
@executorServiceRef
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
|
线程
处理器是一个通用的 DSL 命令,可用于将线程池引入到路由中。它支持以下选项来自定义线程池:
poolSize()
-
池中的最小线程数量(和初始池大小)。
maxPoolSize()
-
池中的最大线程数。
keepAliveTime()
-
如果有任何线程闲置时间超过这一时间段(以秒为单位指定),则它们将被终止。
timeUnit()
-
keep alive 的时间单位,使用
java.util.concurrent.TimeUnit
类型指定。
maxQueueSize()
-
此线程池可以存储在其传入任务队列中的最大待处理任务数量。
rejectedPolicy()
-
指定在传入的任务队列已满时要执行的操作。请查看 表 2.10 “线程池构建器选项”
前面的线程池选项与 executorServiceRef
选项 不兼容 (例如,您无法使用这些选项覆盖 executorServiceRef
选项引用的线程池中的设置)。Apache Camel 验证 DSL 以强制执行这一点。
要为其中一个线程感知处理器创建一个默认线程池,请在 XML DSL 中使用 parallelProcessing
()
子类别、Java DSL 或 parallelProcessing
属性启用 parallelProcessing 选项。
例如,在 Java DSL 中,您可以调用带有默认线程池(其中线程池用于同时处理多播目的地)的多播处理器,如下所示:
from("direct:start")
.multicast().parallelProcessing()
.to("mock:first")
.to("mock:second")
.to("mock:third");
from("direct:start")
.multicast().parallelProcessing()
.to("mock:first")
.to("mock:second")
.to("mock:third");
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
您可以在 XML DSL 中定义相同的路由,如下所示
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="direct:start"/>
<multicast parallelProcessing="true">
<to uri="mock:first"/>
<to uri="mock:second"/>
<to uri="mock:third"/>
</multicast>
</route>
</camelContext>
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="direct:start"/>
<multicast parallelProcessing="true">
<to uri="mock:first"/>
<to uri="mock:second"/>
<to uri="mock:third"/>
</multicast>
</route>
</camelContext>
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
默认线程池由线程工厂自动创建,它从 默认线程池配置集 获取其设置。默认线程池配置集有 表 2.9 “默认线程池配置文件设置” 中显示的设置(假设应用程序代码没有修改这些设置)。
Expand表 2.9. 默认线程池配置文件设置线程选项 | 默认值 |
---|
maxQueueSize
|
1000
|
poolSize
|
10
|
maxPoolSize
|
20
|
keepAliveTime
|
60 (秒)
|
rejectedPolicy
|
CallerRuns
|
可以更改默认的线程池配置文件设置,以便使用自定义设置创建所有后续默认线程池。您可以在 Java 或 Spring XML 中更改配置集。
例如,在 Java DSL 中,您可以自定义 default 线程池配置文件中的 poolSize
选项和 maxQueueSize
选项,如下所示:
// Java
import org.apache.camel.spi.ExecutorServiceManager;
import org.apache.camel.spi.ThreadPoolProfile;
...
ExecutorServiceManager manager = context.getExecutorServiceManager();
ThreadPoolProfile defaultProfile = manager.getDefaultThreadPoolProfile();
// Now, customize the profile settings.
defaultProfile.setPoolSize(3);
defaultProfile.setMaxQueueSize(100);
...
// Java
import org.apache.camel.spi.ExecutorServiceManager;
import org.apache.camel.spi.ThreadPoolProfile;
...
ExecutorServiceManager manager = context.getExecutorServiceManager();
ThreadPoolProfile defaultProfile = manager.getDefaultThreadPoolProfile();
// Now, customize the profile settings.
defaultProfile.setPoolSize(3);
defaultProfile.setMaxQueueSize(100);
...
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
在 XML DSL 中,您可以自定义默认线程池配置集,如下所示:
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<threadPoolProfile
id="changedProfile"
defaultProfile="true"
poolSize="3"
maxQueueSize="100"/>
...
</camelContext>
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<threadPoolProfile
id="changedProfile"
defaultProfile="true"
poolSize="3"
maxQueueSize="100"/>
...
</camelContext>
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
请注意,在前面的 XML DSL 示例中将 defaultProfile
属性设置为 true
非常重要,否则线程池配置集将被视为自定义线程池配置集(请参阅 “创建自定义线程池配置集”一节),而不是替换默认线程池配置集。
也可以使用 executorService
或 executorServiceRef
选项(其中使用这些选项而不是 parallelProcessing
选项)为线程感知处理器指定线程池。您可以使用两种方法来自定义处理器的线程池,如下所示:
-
指定自定义线程池 HEKETI explicitly 创建
ExecutorService
(线程池)实例,并将其传递给 executorService
选项。
-
指定自定义线程池配置集 wagon-wagoncreate 并注册自定义线程池工厂。当您使用
executorServiceRef
选项引用这个工厂时,处理器会自动使用工厂创建自定义线程池实例。
当您将 bean ID 传递给 executorServiceRef
选项时,线程感知处理器首先尝试在 registry 中找到具有该 ID 的自定义线程池。如果没有使用该 ID 注册线程池,则处理器会尝试在 registry 中查找自定义线程池配置集,并使用自定义线程池配置集实例化自定义线程池。
自定义线程池可以是 java.util.concurrent.ExecutorService 类型的任何线程池。Apache Camel 中建议使用以下创建线程池实例的方法:
-
使用
org.apache.camel.builder.ThreadPoolBuilder
实用程序构建线程池类。
-
使用当前
CamelContext
中的 org.apache.camel.spi.ExecutorServiceManager
实例来创建线程池类。
最终,这两种方法之间没有区别,因为 ThreadPoolBuilder
实际上使用 ExecutorServiceManager
实例定义。通常,首选 ThreadPoolBuilder
,因为它提供了一种更简单的方法。但至少有一个线程类型( ScheduledExecutorService
),它们只能通过直接访问 ExecutorServiceManager
实例来创建。
表 2.10 “线程池构建器选项” 显示 ThreadPoolBuilder
类支持的选项,您可以在定义新的自定义线程池时设置。
Expand表 2.10. 线程池构建器选项构建器选项 | 描述 |
---|
maxQueueSize()
|
设置此线程池可在其传入任务队列中存储的最大待处理任务数。值 -1 指定未绑定队列。默认值取自默认线程池配置文件。
|
poolSize()
|
设置池中的最小线程数量(这也是初始池大小)。默认值取自默认线程池配置文件。
|
maxPoolSize()
|
设置池中可以的最大线程数。默认值取自默认线程池配置文件。
|
keepAliveTime()
|
如果有任何线程闲置时间超过这一时间段(以秒为单位指定),则它们将被终止。这允许线程池在负载比较轻时缩小。默认值取自默认线程池配置文件。
|
rejectedPolicy()
|
指定在传入的任务队列已满时要执行的操作。您可以指定四个可能的值:
CallerRuns -
(默认值) 获取调用者线程来运行最新的传入的任务。作为副作用,此选项可防止调用者线程收到任何其他任务,直到它完成处理最新的传入的任务。
Abort -
通过抛出异常中止最新的传入的任务。
discard -
以静默方式丢弃最新的传入的任务。
DiscardOldest -
丢弃最旧的未处理的任务,然后尝试在任务队列中排队最新的传入的任务。
|
build()
|
完成构建自定义线程池,并在指定为 build () 参数的 ID 下注册新的线程池。
|
在 Java DSL 中,您可以使用 ThreadPoolBuilder
定义自定义线程池,如下所示:
// Java
import org.apache.camel.builder.ThreadPoolBuilder;
import java.util.concurrent.ExecutorService;
...
ThreadPoolBuilder poolBuilder = new ThreadPoolBuilder(context);
ExecutorService customPool = poolBuilder.poolSize(5).maxPoolSize(5).maxQueueSize(100).build("customPool");
...
from("direct:start")
.multicast().executorService(customPool)
.to("mock:first")
.to("mock:second")
.to("mock:third");
// Java
import org.apache.camel.builder.ThreadPoolBuilder;
import java.util.concurrent.ExecutorService;
...
ThreadPoolBuilder poolBuilder = new ThreadPoolBuilder(context);
ExecutorService customPool = poolBuilder.poolSize(5).maxPoolSize(5).maxQueueSize(100).build("customPool");
...
from("direct:start")
.multicast().executorService(customPool)
.to("mock:first")
.to("mock:second")
.to("mock:third");
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
您可以通过将 bean ID 传递给 executorService Ref () 选项,而不是直接将对象引用传给 executorService ()
选项,而是在 registry 中查找线程池,如下所示:
// Java
from("direct:start")
.multicast().executorServiceRef("customPool")
.to("mock:first")
.to("mock:second")
.to("mock:third");
// Java
from("direct:start")
.multicast().executorServiceRef("customPool")
.to("mock:first")
.to("mock:second")
.to("mock:third");
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
在 XML DSL 中,您可以使用 threadPool
元素访问 ThreadPoolBuilder
。然后,您可以使用 executorServiceRef
属性引用自定义线程池,根据 Spring registry 中的 ID 查找线程池,如下所示:
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<threadPool id="customPool"
poolSize="5"
maxPoolSize="5"
maxQueueSize="100" />
<route>
<from uri="direct:start"/>
<multicast executorServiceRef="customPool">
<to uri="mock:first"/>
<to uri="mock:second"/>
<to uri="mock:third"/>
</multicast>
</route>
</camelContext>
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<threadPool id="customPool"
poolSize="5"
maxPoolSize="5"
maxQueueSize="100" />
<route>
<from uri="direct:start"/>
<multicast executorServiceRef="customPool">
<to uri="mock:first"/>
<to uri="mock:second"/>
<to uri="mock:third"/>
</multicast>
</route>
</camelContext>
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
如果您有很多自定义线程池实例,您可能会发现定义自定义线程池配置集更为方便,它充当线程池的工厂。每当从线程感知处理器引用线程池配置集时,处理器会自动使用配置集创建新线程池实例。您可以在 Java DSL 或 XML DSL 中定义自定义线程池配置集。
例如,在 Java DSL 中,您可以使用 bean ID customProfile
创建自定义线程池配置集,并从路由内引用它,如下所示:
// Java
import org.apache.camel.spi.ThreadPoolProfile;
import org.apache.camel.impl.ThreadPoolProfileSupport;
...
// Create the custom thread pool profile
ThreadPoolProfile customProfile = new ThreadPoolProfileSupport("customProfile");
customProfile.setPoolSize(5);
customProfile.setMaxPoolSize(5);
customProfile.setMaxQueueSize(100);
context.getExecutorServiceManager().registerThreadPoolProfile(customProfile);
...
// Reference the custom thread pool profile in a route
from("direct:start")
.multicast().executorServiceRef("customProfile")
.to("mock:first")
.to("mock:second")
.to("mock:third");
// Java
import org.apache.camel.spi.ThreadPoolProfile;
import org.apache.camel.impl.ThreadPoolProfileSupport;
...
// Create the custom thread pool profile
ThreadPoolProfile customProfile = new ThreadPoolProfileSupport("customProfile");
customProfile.setPoolSize(5);
customProfile.setMaxPoolSize(5);
customProfile.setMaxQueueSize(100);
context.getExecutorServiceManager().registerThreadPoolProfile(customProfile);
...
// Reference the custom thread pool profile in a route
from("direct:start")
.multicast().executorServiceRef("customProfile")
.to("mock:first")
.to("mock:second")
.to("mock:third");
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
在 XML DSL 中,使用 threadPoolProfile
元素创建自定义池配置集(您可以在其中让 defaultProfile
选项默认为 false
,因为这不是默认的线程池配置集)。您可以使用 bean ID, customProfile
创建自定义线程池配置集,并从路由内引用它,如下所示:
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<threadPoolProfile
id="customProfile"
poolSize="5"
maxPoolSize="5"
maxQueueSize="100" />
<route>
<from uri="direct:start"/>
<multicast executorServiceRef="customProfile">
<to uri="mock:first"/>
<to uri="mock:second"/>
<to uri="mock:third"/>
</multicast>
</route>
</camelContext>
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<threadPoolProfile
id="customProfile"
poolSize="5"
maxPoolSize="5"
maxQueueSize="100" />
<route>
<from uri="direct:start"/>
<multicast executorServiceRef="customProfile">
<to uri="mock:first"/>
<to uri="mock:second"/>
<to uri="mock:third"/>
</multicast>
</route>
</camelContext>
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
某些标准的基于 poll 的组件都基于 poll-wagon,如 File 和 FTP swig-wagonallow,您可以指定要使用的线程池。这使得不同的组件可以共享同一线程池,从而减少 JVM 中线程的整体数量。
例如,Apache Camel 组件参考指南 中的 File2 和 Apache Camel 组件参考指南 中的 Ftp2 都公开 scheduledExecutorService
属性,您可以使用它来指定组件的 ExecutorService
对象。
要使应用日志更易读,通常最好自定义线程名称(用于标识日志中线程)。要自定义线程名称,您可以通过在 ExecutorServiceStrategy
类或 ExecutorServiceManager
类上调用 setThreadNamePattern
方法来配置线程名称 模式。另外,设置线程名称模式的一种更容易方法是在 CamelContext
对象中设置 threadNamePattern
属性。
以下占位符可以在线程名称模式中使用:
#camelId#
-
当前
CamelContext
的名称。
#counter failing
-
唯一线程标识符,作为递增计数器实施。
#name#
-
常规 Camel 线程名称。
#longName#
-
较长的线程名称可以包括端点参数,以此类推。
以下是线程名称模式的典型示例:
Camel (#camelId#) thread #counter# - #name#
Camel (#camelId#) thread #counter# - #name#
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
以下示例演示了如何使用 XML DSL 在 Camel 上下文上设置 threadNamePattern
属性:
<camelContext xmlns="http://camel.apache.org/schema/spring"
threadNamePattern="Riding the thread #counter#" >
<route>
<from uri="seda:start"/>
<to uri="log:result"/>
<to uri="mock:result"/>
</route>
</camelContext>
<camelContext xmlns="http://camel.apache.org/schema/spring"
threadNamePattern="Riding the thread #counter#" >
<route>
<from uri="seda:start"/>
<to uri="log:result"/>
<to uri="mock:result"/>
</route>
</camelContext>
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow