Apache Camel 线程模型基于强大的 Java 并发 API 软件包 java.util.concurrent ,它最初在 Sun 的 JDK 1.5 中提供。此 API 中的密钥接口是 ExecutorService
接口,它代表一个线程池。使用并发 API,您可以创建许多不同类型的线程池,涵盖各种场景。
Apache Camel 线程池 API 基于 Java 并发 API 构建,方法是为您的 Apache Camel 应用程序中的所有线程池提供中央工厂( org.apache.camel.spi.ExecutorServiceManager
类型)。以这种方式集中创建线程池提供了几个优点,包括:
使用实用程序类简化线程池的创建。
将线程池与安全关闭集成。
线程自动给出信息性名称,这对于日志记录和管理非常有用。
一些 Apache Camel 组件是型的,如 SEDA、JMS 和 Jetty iwl-osgi 是以多线程方式本质上是多线程的。这些组件都使用 Apache Camel 线程模型和线程池 API 实施。
如果您计划实施自己的 Apache Camel 组件,建议您将线程代码与 Apache Camel 线程模式集成。例如,如果您的组件需要线程池,建议您使用 CamelContext 的 ExecutorServiceManager
对象创建它。
默认情况下,Apache Camel 中的一些标准处理器创建自己的线程池。这些线程感知型处理器也与 Apache Camel 线程模型集成,它们提供了各种选项,供您自定义它们使用的线程池。
表 2.8 “处理器线程选项” 显示在 Apache Camel 内置的线程感知处理器上控制和设置线程池的各种选项。
Expand 表 2.8. 处理器线程选项 处理器 Java DSL XML DSL
聚合
parallelProcessing()
executorService()
executorServiceRef()
parallelProcessing()
executorService()
executorServiceRef()
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
@parallelProcessing
@executorServiceRef
@parallelProcessing
@executorServiceRef
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
multicast
parallelProcessing()
executorService()
executorServiceRef()
parallelProcessing()
executorService()
executorServiceRef()
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
@parallelProcessing
@executorServiceRef
@parallelProcessing
@executorServiceRef
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
recipientList
parallelProcessing()
executorService()
executorServiceRef()
parallelProcessing()
executorService()
executorServiceRef()
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
@parallelProcessing
@executorServiceRef
@parallelProcessing
@executorServiceRef
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
split
parallelProcessing()
executorService()
executorServiceRef()
parallelProcessing()
executorService()
executorServiceRef()
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
@parallelProcessing
@executorServiceRef
@parallelProcessing
@executorServiceRef
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
threads
executorService()
executorServiceRef()
poolSize()
maxPoolSize()
keepAliveTime()
timeUnit()
maxQueueSize()
rejectedPolicy()
executorService()
executorServiceRef()
poolSize()
maxPoolSize()
keepAliveTime()
timeUnit()
maxQueueSize()
rejectedPolicy()
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
@executorServiceRef
@poolSize
@maxPoolSize
@keepAliveTime
@timeUnit
@maxQueueSize
@rejectedPolicy
@executorServiceRef
@poolSize
@maxPoolSize
@keepAliveTime
@timeUnit
@maxQueueSize
@rejectedPolicy
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
wireTap
wireTap(String uri, ExecutorService executorService)
wireTap(String uri, String executorServiceRef)
wireTap(String uri, ExecutorService executorService)
wireTap(String uri, String executorServiceRef)
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
@executorServiceRef
@executorServiceRef
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
Show more
线程
处理器是一个通用的 DSL 命令,可用于将线程池引入路由。它支持以下选项来自定义线程池:
poolSize()
池中的最小线程数量(和初始池大小)。
maxPoolSize()
池中的最大线程数量。
keepAliveTime()
如果任何线程闲置的时间超过这个时间段(以秒为单位指定),则终止它们。
timeUnit()
keep alive 的时间单位,使用 java.util.concurrent.TimeUnit
类型指定。
maxQueueSize()
此线程池可以存储在其传入任务队列中的最大待处理任务数量。
rejectedPolicy()
指定在传入的任务队列满时要采取哪些操作课程。请查看 表 2.10 “线程池构建器选项”
前面的线程池选项与 executorServiceRef
选项 不兼容 (例如,您无法使用这些选项来覆盖 executorServiceRef
选项引用的线程池中的设置)。Apache Camel 验证 DSL 以执行此操作。
要为一个线程感知的处理器创建默认线程池,请在 XML DSL 中使用 parallelProcessing
()
sub-clause, 或 parallelProcessing
属性启用 parallelProcessing 选项。
例如,在 Java DSL 中,您可以使用默认线程池调用多播处理器(线程池用于同时处理多播目的地),如下所示:
from("direct:start")
.multicast().parallelProcessing()
.to("mock:first")
.to("mock:second")
.to("mock:third");
from("direct:start")
.multicast().parallelProcessing()
.to("mock:first")
.to("mock:second")
.to("mock:third");
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
您可以在 XML DSL 中定义相同的路由,如下所示
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="direct:start"/>
<multicast parallelProcessing="true">
<to uri="mock:first"/>
<to uri="mock:second"/>
<to uri="mock:third"/>
</multicast>
</route>
</camelContext>
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="direct:start"/>
<multicast parallelProcessing="true">
<to uri="mock:first"/>
<to uri="mock:second"/>
<to uri="mock:third"/>
</multicast>
</route>
</camelContext>
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
默认线程池由线程工厂自动创建,该工厂从默认的 线程池配置文件 获取其设置。默认线程池配置集在 表 2.9 “默认线程池配置文件设置” 中显示的设置(假设这些设置还没有由应用程序代码修改)。
Expand 表 2.9. 默认线程池配置文件设置 线程选项 默认值
maxQueueSize
1000
poolSize
10
maxPoolSize
20
keepAliveTime
60
(秒)
rejectedPolicy
CallerRuns
Show more
可以更改默认线程池配置集设置,以便使用自定义设置创建后续所有默认线程池。您可以在 Java 中或 Spring XML 中更改配置集。
例如,在 Java DSL 中,您可以自定义 default 线程池配置文件中的 poolSize
选项和 maxQueueSize
选项,如下所示:
// Java
import org.apache.camel.spi.ExecutorServiceManager;
import org.apache.camel.spi.ThreadPoolProfile;
...
ExecutorServiceManager manager = context.getExecutorServiceManager();
ThreadPoolProfile defaultProfile = manager.getDefaultThreadPoolProfile();
// Now, customize the profile settings.
defaultProfile.setPoolSize(3);
defaultProfile.setMaxQueueSize(100);
...
// Java
import org.apache.camel.spi.ExecutorServiceManager;
import org.apache.camel.spi.ThreadPoolProfile;
...
ExecutorServiceManager manager = context.getExecutorServiceManager();
ThreadPoolProfile defaultProfile = manager.getDefaultThreadPoolProfile();
// Now, customize the profile settings.
defaultProfile.setPoolSize(3);
defaultProfile.setMaxQueueSize(100);
...
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
在 XML DSL 中,您可以自定义默认线程池配置集,如下所示:
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<threadPoolProfile
id="changedProfile"
defaultProfile="true"
poolSize="3"
maxQueueSize="100"/>
...
</camelContext>
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<threadPoolProfile
id="changedProfile"
defaultProfile="true"
poolSize="3"
maxQueueSize="100"/>
...
</camelContext>
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
请注意,在前面的 XML DSL 示例中,务必要将 defaultProfile
属性设置为 true
,否则线程池配置集将被视为自定义线程池配置文件(请参阅 “创建自定义线程池配置集”一节 ),而不是替换默认的线程池配置集。
也可以使用 executorService
或 executorServiceRef
选项(其中使用这些选项而不是 parallelProcessing
选项)为线程感知处理器指定线程池。您可以使用两种方法来自定义处理器的线程池,如下所示:
指定自定义线程池 criu -wagonexplicitly 创建一个 ExecutorService
(线程池)实例,并将其传递给 executorService
选项。
指定自定义线程池配置集 criu -wagoncreate,并注册自定义线程池工厂。当您使用 executorServiceRef
选项引用此工厂时,处理器会自动使用工厂创建自定义线程池实例。
当您将 bean ID 传递给 executorServiceRef
选项时,线程感知处理器首先会尝试在 registry 中使用该 ID 查找带有该 ID 的自定义线程池。如果没有使用该 ID 注册线程池,处理器会尝试在 registry 中查找自定义线程池配置集,并使用自定义线程池配置集实例化自定义线程池。
自定义线程池可以是 java.util.concurrent.ExecutorService 类型的任何线程池。Apache Camel 中建议使用以下创建线程池实例的方法:
使用 org.apache.camel.builder.ThreadPoolBuilder
实用程序构建线程池类。
使用当前 CamelContext
中的 org.apache.camel.spi.ExecutorServiceManager
实例来创建线程池类。
最终,这两种方法之间没有区别,因为 ThreadPoolBuilder
实际上是使用 ExecutorServiceManager
实例定义的。通常,ThreadPoolBuilder
是首选的,因为它提供了一种更简单的方法。但是,至少有一个线程( ScheduledExecutorService
)只能通过访问 ExecutorService
实例目录来创建。
表 2.10 “线程池构建器选项” 显示 ThreadPoolBuilder
类支持的选项,您可以在定义新的自定义线程池时设置这些选项。
Expand 表 2.10. 线程池构建器选项 构建器选项 描述
maxQueueSize()
设置此线程池可在其传入任务队列中存储的最大待处理任务数量。值 -1
指定未绑定的队列。默认值取自默认的线程池配置文件。
poolSize()
设置池中最少的线程数(这也是初始池大小)。默认值取自默认的线程池配置文件。
maxPoolSize()
设置池中可以的最大线程数。默认值取自默认的线程池配置文件。
keepAliveTime()
如果任何线程闲置的时间超过这个时间段(以秒为单位指定),则终止它们。这允许线程池在负载为 light 时缩小。默认值取自默认的线程池配置文件。
rejectedPolicy()
指定在传入的任务队列满时要采取哪些操作课程。您可以指定四个可能的值:
CallerRuns
(默认值) 获取调用者线程来运行最新的传入任务。作为副作用,此选项可防止调用者线程接收任何更多任务,直到处理最新的传入的任务为止。
Abort
通过抛出异常来中止最新的传入任务。
discard
以静默方式丢弃最新的传入的任务。
DiscardOldest
丢弃最旧的未处理的任务,然后尝试对任务队列中的最新传入的任务进行排队。
build()
完成构建自定义线程池,并在指定为 build ()
参数的 ID 下注册新的线程池。
Show more
在 Java DSL 中,您可以使用 ThreadPoolBuilder
定义自定义线程池,如下所示:
// Java
import org.apache.camel.builder.ThreadPoolBuilder;
import java.util.concurrent.ExecutorService;
...
ThreadPoolBuilder poolBuilder = new ThreadPoolBuilder(context);
ExecutorService customPool = poolBuilder.poolSize(5).maxPoolSize(5).maxQueueSize(100).build("customPool");
...
from("direct:start")
.multicast().executorService(customPool)
.to("mock:first")
.to("mock:second")
.to("mock:third");
// Java
import org.apache.camel.builder.ThreadPoolBuilder;
import java.util.concurrent.ExecutorService;
...
ThreadPoolBuilder poolBuilder = new ThreadPoolBuilder(context);
ExecutorService customPool = poolBuilder.poolSize(5).maxPoolSize(5).maxQueueSize(100).build("customPool");
...
from("direct:start")
.multicast().executorService(customPool)
.to("mock:first")
.to("mock:second")
.to("mock:third");
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
您可以通过将对象 ID 传递给 executorServiceRef ()选项,而不是将对象引用(Custom Pool
)传递给 executorService
Ref ()选项,而是在 registry 中查找线程池,方法是将其 bean ID 传递给 executorServiceRef ()
选项,如下所示:
// Java
from("direct:start")
.multicast().executorServiceRef("customPool")
.to("mock:first")
.to("mock:second")
.to("mock:third");
// Java
from("direct:start")
.multicast().executorServiceRef("customPool")
.to("mock:first")
.to("mock:second")
.to("mock:third");
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
在 XML DSL 中,您可以使用 threadPool
元素访问 ThreadPoolBuilder
。然后,您可以使用 executorServiceRef
属性引用自定义线程池,来根据 Spring registry 中的 ID 查找线程池,如下所示:
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<threadPool id="customPool"
poolSize="5"
maxPoolSize="5"
maxQueueSize="100" />
<route>
<from uri="direct:start"/>
<multicast executorServiceRef="customPool">
<to uri="mock:first"/>
<to uri="mock:second"/>
<to uri="mock:third"/>
</multicast>
</route>
</camelContext>
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<threadPool id="customPool"
poolSize="5"
maxPoolSize="5"
maxQueueSize="100" />
<route>
<from uri="direct:start"/>
<multicast executorServiceRef="customPool">
<to uri="mock:first"/>
<to uri="mock:second"/>
<to uri="mock:third"/>
</multicast>
</route>
</camelContext>
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
如果您有许多自定义线程池实例来创建,您可能会发现定义自定义线程池配置集更方便,它充当线程池的工厂。每当您从线程感知的处理器引用线程池配置集时,处理器会自动使用配置集来创建新的线程池实例。您可以在 Java DSL 或 XML DSL 中定义自定义线程池配置集。
例如,在 Java DSL 中,您可以创建一个带有 bean ID ( customProfile
)的自定义线程池配置集,并从路由中引用它,如下所示:
// Java
import org.apache.camel.spi.ThreadPoolProfile;
import org.apache.camel.impl.ThreadPoolProfileSupport;
...
// Create the custom thread pool profile
ThreadPoolProfile customProfile = new ThreadPoolProfileSupport("customProfile");
customProfile.setPoolSize(5);
customProfile.setMaxPoolSize(5);
customProfile.setMaxQueueSize(100);
context.getExecutorServiceManager().registerThreadPoolProfile(customProfile);
...
// Reference the custom thread pool profile in a route
from("direct:start")
.multicast().executorServiceRef("customProfile")
.to("mock:first")
.to("mock:second")
.to("mock:third");
// Java
import org.apache.camel.spi.ThreadPoolProfile;
import org.apache.camel.impl.ThreadPoolProfileSupport;
...
// Create the custom thread pool profile
ThreadPoolProfile customProfile = new ThreadPoolProfileSupport("customProfile");
customProfile.setPoolSize(5);
customProfile.setMaxPoolSize(5);
customProfile.setMaxQueueSize(100);
context.getExecutorServiceManager().registerThreadPoolProfile(customProfile);
...
// Reference the custom thread pool profile in a route
from("direct:start")
.multicast().executorServiceRef("customProfile")
.to("mock:first")
.to("mock:second")
.to("mock:third");
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
在 XML DSL 中,使用 threadPoolProfile
元素创建自定义池配置集(其中,默认Profile
选项默认为 false
,因为这不是默认的线程池配置集)。 您可以使用 bean ID customProfile
创建自定义线程池配置集,并从路由中引用它,如下所示:
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<threadPoolProfile
id="customProfile"
poolSize="5"
maxPoolSize="5"
maxQueueSize="100" />
<route>
<from uri="direct:start"/>
<multicast executorServiceRef="customProfile">
<to uri="mock:first"/>
<to uri="mock:second"/>
<to uri="mock:third"/>
</multicast>
</route>
</camelContext>
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<threadPoolProfile
id="customProfile"
poolSize="5"
maxPoolSize="5"
maxQueueSize="100" />
<route>
<from uri="direct:start"/>
<multicast executorServiceRef="customProfile">
<to uri="mock:first"/>
<to uri="mock:second"/>
<to uri="mock:third"/>
</multicast>
</route>
</camelContext>
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
一些基于标准轮询的组件,如 File 和 FTP iwl-setuptoolsallow,以指定要使用的线程池。这使得不同的组件能够共享同一线程池,从而减少 JVM 中的线程总数。
例如,Apache Camel 组件参考指南 中的 see File2 和 Apache Camel 组件参考指南 中的 Ftp2 都公开 scheduledExecutorService
属性,您可以使用它来指定组件的 ExecutorService
对象。
为了让应用程序日志更易读,通常最好自定义线程名称(用于识别日志中线程)。要自定义线程名称,您可以通过调用 ExecutorServiceStrategy
类或 ExecutorServiceManager
类上的 setThreadNamePattern
方法来配置线程名称 模式 。或者,设置线程名称模式的一种更简单方法是设置 CamelContext
对象上的 threadNamePattern
属性。
以下占位符可用于线程名称模式:
#camelId#
当前 CamelContext
的名称。
#counter#
唯一的线程标识符,作为递增计数器实施。
#name#
常规 Camel 线程名称。
#longName#
较长的线程名称可以包括端点参数,以此类推。
以下是线程名称模式的典型示例:
Camel (#camelId#) thread #counter# - #name#
Camel (#camelId#) thread #counter# - #name#
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
以下示例演示了如何使用 XML DSL 在 Camel 上下文上设置 threadNamePattern
属性:
<camelContext xmlns="http://camel.apache.org/schema/spring"
threadNamePattern="Riding the thread #counter#" >
<route>
<from uri="seda:start"/>
<to uri="log:result"/>
<to uri="mock:result"/>
</route>
</camelContext>
<camelContext xmlns="http://camel.apache.org/schema/spring"
threadNamePattern="Riding the thread #counter#" >
<route>
<from uri="seda:start"/>
<to uri="log:result"/>
<to uri="mock:result"/>
</route>
</camelContext>
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow