Apache Camel 线程模型基于强大的 Java 并发 API Package java.util.concurrent ,最初在 Sun 的 JDK 1.5 中提供。此 API 中的关键接口是 ExecutorService
接口,它代表线程池。使用并发 API,您可以创建许多不同的线程池,涵盖了广泛的场景。
Apache Camel 线程池 API 将基于 Java 并发 API 构建,为 Apache Camel 应用程序中的所有线程池提供中心工厂( org.apache.camel.spi.ExecutorServiceManager
类型)。以这种方式进行线程池的创建提供了几个优点,包括:
使用实用程序类简化线程池的创建。
将线程池与安全关闭集成.
线程会自动提供信息名称,这对记录和管理有益。
一些 Apache Camel 组件为 SEDA、JMS 和 Jettytty- inherent inherent多线程。这些组件已使用 Apache Camel 线程模型和线程池 API 实施。
如果您计划实施自己的 Apache Camel 组件,建议您将线程代码与 Apache Camel 线程模型集成。例如,如果您的组件需要线程池,建议您使用 CamelContext 的 ExecutorServiceManager
对象创建它。
Apache Camel 中的一些标准处理器默认创建自己的线程池。这些线程感知型处理器也与 Apache Camel 线程模型集成,它们提供各种选项,供您自定义它们使用的线程池。
表 2.8 “处理器线程选项” 显示了在内置到 Apache Camel 的线程处理器上控制和设置线程池的各种选项。
Expand 表 2.8. 处理器线程选项 处理器 Java DSL XML DSL
aggregate
parallelProcessing()
executorService()
executorServiceRef()
parallelProcessing()
executorService()
executorServiceRef()
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
@parallelProcessing
@executorServiceRef
@parallelProcessing
@executorServiceRef
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
multicast
parallelProcessing()
executorService()
executorServiceRef()
parallelProcessing()
executorService()
executorServiceRef()
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
@parallelProcessing
@executorServiceRef
@parallelProcessing
@executorServiceRef
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
recipientList
parallelProcessing()
executorService()
executorServiceRef()
parallelProcessing()
executorService()
executorServiceRef()
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
@parallelProcessing
@executorServiceRef
@parallelProcessing
@executorServiceRef
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
split
parallelProcessing()
executorService()
executorServiceRef()
parallelProcessing()
executorService()
executorServiceRef()
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
@parallelProcessing
@executorServiceRef
@parallelProcessing
@executorServiceRef
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
threads
executorService()
executorServiceRef()
poolSize()
maxPoolSize()
keepAliveTime()
timeUnit()
maxQueueSize()
rejectedPolicy()
executorService()
executorServiceRef()
poolSize()
maxPoolSize()
keepAliveTime()
timeUnit()
maxQueueSize()
rejectedPolicy()
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
@executorServiceRef
@poolSize
@maxPoolSize
@keepAliveTime
@timeUnit
@maxQueueSize
@rejectedPolicy
@executorServiceRef
@poolSize
@maxPoolSize
@keepAliveTime
@timeUnit
@maxQueueSize
@rejectedPolicy
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
wireTap
wireTap(String uri, ExecutorService executorService)
wireTap(String uri, String executorServiceRef)
wireTap(String uri, ExecutorService executorService)
wireTap(String uri, String executorServiceRef)
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
@executorServiceRef
@executorServiceRef
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
Show more
线程
处理器是一种通用的 DSL 命令,可用于将线程池引入到路由中。它支持以下选项自定义线程池:
poolSize()
池中的最小线程数量(以及初始池大小)。
maxPoolSize()
池中的最大线程数量。
keepAliveTime()
如果任何线程闲置的时间超过这个时间段(以秒为单位指定),则这些线程将被终止。
timeUnit()
使用 java.util.concurrent.TimeUnit
类型指定的时间单位。
maxQueueSize()
此线程池可以在其传入任务队列中存储的最大待处理任务数量。
rejectedPolicy()
指定在传入的任务队列已满时要执行的操作。请查看 表 2.10 “线程池构建器选项”
前面的线程池选项与 executorServiceRef
选项 不兼容 (例如,您不能使用这些选项覆盖 executorServiceRef
选项引用的线程池中的设置)。Apache Camel 验证 DSL 来执行此操作。
要为一个线程感知处理器创建默认线程池,启用 parallelProcessing
选项,使用 parallelProcessing()
子使用、Java DSL 或 parallelProcessing
属性(在 XML DSL 中)。
例如,在 Java DSL 中,您可以使用默认线程池(使用线程池同时处理多播目的地)调用多播处理器:
from("direct:start")
.multicast().parallelProcessing()
.to("mock:first")
.to("mock:second")
.to("mock:third");
from("direct:start")
.multicast().parallelProcessing()
.to("mock:first")
.to("mock:second")
.to("mock:third");
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
您可以在 XML DSL 中定义与以下方式相同的路由
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="direct:start"/>
<multicast parallelProcessing="true">
<to uri="mock:first"/>
<to uri="mock:second"/>
<to uri="mock:third"/>
</multicast>
</route>
</camelContext>
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="direct:start"/>
<multicast parallelProcessing="true">
<to uri="mock:first"/>
<to uri="mock:second"/>
<to uri="mock:third"/>
</multicast>
</route>
</camelContext>
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
默认线程池由线程工厂自动创建,它从 默认的线程池配置集 获取其设置。默认线程池配置集在 表 2.9 “默认线程池配置集设置” 中显示的设置(假设这些设置还没有由应用程序代码修改)。
Expand 表 2.9. 默认线程池配置集设置 线程选项 默认值
maxQueueSize
1000
poolSize
10
maxPoolSize
20
keepAliveTime
60
(秒)
rejectedPolicy
CallerRuns
Show more
可以更改默认的线程池配置集设置,以便所有后续的默认线程池都使用自定义设置来创建。您可以在 Java 或 Spring XML 中更改配置集。
例如,在 Java DSL 中,您可以自定义 poolSize
选项和默认线程池配置集中的 maxQueueSize
选项,如下所示:
// Java
import org.apache.camel.spi.ExecutorServiceManager;
import org.apache.camel.spi.ThreadPoolProfile;
...
ExecutorServiceManager manager = context.getExecutorServiceManager();
ThreadPoolProfile defaultProfile = manager.getDefaultThreadPoolProfile();
// Now, customize the profile settings.
defaultProfile.setPoolSize(3);
defaultProfile.setMaxQueueSize(100);
...
// Java
import org.apache.camel.spi.ExecutorServiceManager;
import org.apache.camel.spi.ThreadPoolProfile;
...
ExecutorServiceManager manager = context.getExecutorServiceManager();
ThreadPoolProfile defaultProfile = manager.getDefaultThreadPoolProfile();
// Now, customize the profile settings.
defaultProfile.setPoolSize(3);
defaultProfile.setMaxQueueSize(100);
...
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
在 XML DSL 中,您可以自定义默认的线程池配置集,如下所示:
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<threadPoolProfile
id="changedProfile"
defaultProfile="true"
poolSize="3"
maxQueueSize="100"/>
...
</camelContext>
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<threadPoolProfile
id="changedProfile"
defaultProfile="true"
poolSize="3"
maxQueueSize="100"/>
...
</camelContext>
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
请注意,在前面的 XML DSL 示例中将 defaultProfile
属性设置为 true
非常重要,否则线程池配置集将被视为自定义线程池配置集(请参阅 “创建自定义线程池配置集”一节 ),而不是替换默认的线程池配置集。
还可以使用 executorService
或 executorServiceRef
选项(其中使用这些选项而不是 parallelProcessing
选项)指定线程池。您可以使用两种方法来自定义处理器的线程池,如下所示:
指定自定义线程池 是否已创建 ExecutorService
(线程池)实例,并将它传递到 executorService
选项。
指定自定义线程池配置集 InventoryService- responsiblecreate 并注册自定义线程池工厂。当您使用 executorServiceRef
选项引用这个 factory 时,处理器会自动使用 factory 创建自定义线程池实例。
当您将 bean ID 传递给 executorServiceRef
选项时,线程识别处理器首先会尝试查找带有该 ID 在 registry 中的 ID 的自定义线程池。如果没有使用该 ID 注册了线程池,则处理器会尝试在 registry 中查找自定义线程池配置集,并使用自定义线程池配置集来实例化自定义线程池。
自定义线程池可以是 java.util.concurrent.ExecutorService 类型的任何线程池。在 Apache Camel 中,建议使用以下创建线程池实例的方法:
使用 org.apache.camel.builder.ThreadPoolBuilder
实用程序构建线程池类。
使用来自当前 CamelContext
的 org.apache.camel.spi.ExecutorServiceManager
实例来创建线程池类。
最后,这两种方法之间没有区别,因为 ThreadPoolBuilder
实际使用 ExecutorServiceManager
实例定义。通常,最好使用 ThreadPoolBuilder
,因为它提供了一个更简单的方法。但是,至少有一种线程( ScheduledExecutorService
)只能通过直接访问 ExecutorServiceManager
实例来创建。
表 2.10 “线程池构建器选项” 显示 ThreadPoolBuilder
类支持的选项,您可以在定义新的自定义线程池时设置这些选项。
Expand 表 2.10. 线程池构建器选项 构建器选项 描述
maxQueueSize()
设置此线程池可以存储在其传入任务队列中的最大待处理任务数量。值 -1
指定未绑定的队列。默认值从默认的线程池配置集中提取。
poolSize()
设置池中最小线程数量(这也是初始池大小)。默认值从默认的线程池配置集中提取。
maxPoolSize()
设置池中可以的最大线程数量。默认值从默认的线程池配置集中提取。
keepAliveTime()
如果任何线程闲置的时间超过这个时间段(以秒为单位指定),则这些线程将被终止。这样,当负载较正常时,线程池可以缩小。默认值从默认的线程池配置集中提取。
rejectedPolicy()
指定在传入的任务队列已满时要执行的操作。您可以指定 4 个可能值:
CallerRuns
(默认值) 获取调用器线程以运行最新的传入任务。这个问题的一个副作用是,此选项可防止调用者线程接收更多任务,直到完成最新的传入任务。
Abort
通过抛出异常以中止最新的传入任务。
discard
以静默方式丢弃最新的传入任务。
DiscardOldest
丢弃最旧的未处理的任务,然后尝试在任务队列中放入最新的传入的任务。
build()
完成构建自定义线程池,并将新线程池注册到 build()
的 ID 下。
Show more
在 Java DSL 中,您可以使用 ThreadPoolBuilder
定义自定义线程池,如下所示:
// Java
import org.apache.camel.builder.ThreadPoolBuilder;
import java.util.concurrent.ExecutorService;
...
ThreadPoolBuilder poolBuilder = new ThreadPoolBuilder(context);
ExecutorService customPool = poolBuilder.poolSize(5).maxPoolSize(5).maxQueueSize(100).build("customPool");
...
from("direct:start")
.multicast().executorService(customPool)
.to("mock:first")
.to("mock:second")
.to("mock:third");
// Java
import org.apache.camel.builder.ThreadPoolBuilder;
import java.util.concurrent.ExecutorService;
...
ThreadPoolBuilder poolBuilder = new ThreadPoolBuilder(context);
ExecutorService customPool = poolBuilder.poolSize(5).maxPoolSize(5).maxQueueSize(100).build("customPool");
...
from("direct:start")
.multicast().executorService(customPool)
.to("mock:first")
.to("mock:second")
.to("mock:third");
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
您可以通过将 bean ID 传递给 executorServiceRef()
选项,而不必将对象引用 customPool
直接传递给 executorService()
选项,如下所示:
// Java
from("direct:start")
.multicast().executorServiceRef("customPool")
.to("mock:first")
.to("mock:second")
.to("mock:third");
// Java
from("direct:start")
.multicast().executorServiceRef("customPool")
.to("mock:first")
.to("mock:second")
.to("mock:third");
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
在 XML DSL 中,您可以使用 threadPool
元素访问 ThreadPoolBuilder
。然后,您可以使用 executorServiceRef
属性来引用自定义线程池,以便在 Spring registry 中根据 ID 查找线程池,如下所示:
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<threadPool id="customPool"
poolSize="5"
maxPoolSize="5"
maxQueueSize="100" />
<route>
<from uri="direct:start"/>
<multicast executorServiceRef="customPool">
<to uri="mock:first"/>
<to uri="mock:second"/>
<to uri="mock:third"/>
</multicast>
</route>
</camelContext>
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<threadPool id="customPool"
poolSize="5"
maxPoolSize="5"
maxQueueSize="100" />
<route>
<from uri="direct:start"/>
<multicast executorServiceRef="customPool">
<to uri="mock:first"/>
<to uri="mock:second"/>
<to uri="mock:third"/>
</multicast>
</route>
</camelContext>
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
如果您有多个要创建的自定义线程池实例,您可能会发现它更加方便地定义自定义线程池配置集,该配置集充当线程池的工厂。每当您引用线程感知处理器的线程池配置集时,处理器会自动使用配置集来创建新的线程池实例。您可以在 Java DSL 或 XML DSL 中定义自定义线程池配置集。
例如,在 Java DSL 中,您可以使用 bean ID customProfile
并从路由中引用它,如下所示:
// Java
import org.apache.camel.spi.ThreadPoolProfile;
import org.apache.camel.impl.ThreadPoolProfileSupport;
...
// Create the custom thread pool profile
ThreadPoolProfile customProfile = new ThreadPoolProfileSupport("customProfile");
customProfile.setPoolSize(5);
customProfile.setMaxPoolSize(5);
customProfile.setMaxQueueSize(100);
context.getExecutorServiceManager().registerThreadPoolProfile(customProfile);
...
// Reference the custom thread pool profile in a route
from("direct:start")
.multicast().executorServiceRef("customProfile")
.to("mock:first")
.to("mock:second")
.to("mock:third");
// Java
import org.apache.camel.spi.ThreadPoolProfile;
import org.apache.camel.impl.ThreadPoolProfileSupport;
...
// Create the custom thread pool profile
ThreadPoolProfile customProfile = new ThreadPoolProfileSupport("customProfile");
customProfile.setPoolSize(5);
customProfile.setMaxPoolSize(5);
customProfile.setMaxQueueSize(100);
context.getExecutorServiceManager().registerThreadPoolProfile(customProfile);
...
// Reference the custom thread pool profile in a route
from("direct:start")
.multicast().executorServiceRef("customProfile")
.to("mock:first")
.to("mock:second")
.to("mock:third");
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
在 XML DSL 中,使用 threadPoolProfile
元素创建自定义池配置集(其中,默认Profile
选项默认为 false
,因为这 不是默认 的线程池配置集)。您可以使用 bean ID customProfile
创建自定义线程池配置集,并从路由中引用它,如下所示:
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<threadPoolProfile
id="customProfile"
poolSize="5"
maxPoolSize="5"
maxQueueSize="100" />
<route>
<from uri="direct:start"/>
<multicast executorServiceRef="customProfile">
<to uri="mock:first"/>
<to uri="mock:second"/>
<to uri="mock:third"/>
</multicast>
</route>
</camelContext>
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<threadPoolProfile
id="customProfile"
poolSize="5"
maxPoolSize="5"
maxQueueSize="100" />
<route>
<from uri="direct:start"/>
<multicast executorServiceRef="customProfile">
<to uri="mock:first"/>
<to uri="mock:second"/>
<to uri="mock:third"/>
</multicast>
</route>
</camelContext>
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
某些标准基于轮询的组件(如 File 和 FTP1028-jaxballow)要指定要使用的线程池。这使得不同组件可以共享同一线程池,从而减少了 JVM 中的总线程数量。
例如,Apache Camel 组件参考指南 中的 see File2 和 Apache Camel 组件参考指南 中的 Ftp2 均公开 scheduledExecutorService
属性,您可以使用它来指定组件的 ExecutorService
对象。
要使应用程序日志更易阅读,通常最好自定义线程名称(用于识别日志中的线程)。要自定义线程名称,您可以通过在 ExecutorServiceStrategy
类或 ExecutorServiceManager
类上调用 setThreadNamePattern
方法来配置线程名称 模式 。另外,设置线程名称模式的更简单方法是在 CamelContext
对象中设置 threadNamePattern
属性。
以下占位符可以在线程名称模式中使用:
#camelId#
当前 CamelContext
的名称。
#counter#
唯一的线程标识符,作为递增计数器实施。
#name#
常规 Camel 线程名称。
#longName#
长线程名称 3.10.0-->_< 可以包含端点参数,以此类推。
以下是线程名称模式的典型示例:
Camel (#camelId#) thread #counter# - #name#
Camel (#camelId#) thread #counter# - #name#
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
以下示例演示了如何使用 XML DSL 在 Camel 上下文上设置 threadNamePattern
属性:
<camelContext xmlns="http://camel.apache.org/schema/spring"
threadNamePattern="Riding the thread #counter#" >
<route>
<from uri="seda:start"/>
<to uri="log:result"/>
<to uri="mock:result"/>
</route>
</camelContext>
<camelContext xmlns="http://camel.apache.org/schema/spring"
threadNamePattern="Riding the thread #counter#" >
<route>
<from uri="seda:start"/>
<to uri="log:result"/>
<to uri="mock:result"/>
</route>
</camelContext>
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow