Apache Camel 线程模型基于强大的 Java 并发 API Package java.util.concurrent ,最初在 Sun 的 JDK 1.5 中提供。此 API 中的关键接口是 ExecutorService
接口,它代表一个线程池。使用并发 API,您可以创建多种不同类型的线程池,涵盖广泛的场景。
Apache Camel 线程池 API 基于 Java 并发 API 构建,方法是为 Apache Camel 应用程序中的所有线程池提供中央工厂( org.apache.camel.spi.ExecutorServiceManager
类型)。以这种方式集中创建线程池提供了几个优点,包括:
使用实用程序类简化线程池的创建。
将线程池与正常关闭集成.
线程会自动获得丰富的名称,这对于记录和管理很有帮助。
一些 Apache Camel 组件在 multi-threaded 等,如 SEDA、JMS 和 Jetty PACKAGE-storageclassare 本质上是多线程。这些组件均使用 Apache Camel 线程模型和线程池 API 实施。
如果您计划实施自己的 Apache Camel 组件,建议您将线程代码与 Apache Camel 线程模型集成。例如,如果您的组件需要线程池,建议您使用 CamelContext 的 ExecutorServiceManager
对象创建它。
在 Apache Camel 中某些标准处理器默认创建了自己的线程池。这些线程感知处理器也与 Apache Camel 线程处理模型集成,它们提供了各种选项,供您自定义其使用的线程池。
表 2.8 “处理器线程选项” 显示各种选项,用于控制和设置线程感知处理器上内置到 Apache Camel 的线程池。
Expand 表 2.8. 处理器线程选项 处理器 Java DSL XML DSL
aggregate
parallelProcessing()
executorService()
executorServiceRef()
parallelProcessing()
executorService()
executorServiceRef()
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
@parallelProcessing
@executorServiceRef
@parallelProcessing
@executorServiceRef
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
multicast
parallelProcessing()
executorService()
executorServiceRef()
parallelProcessing()
executorService()
executorServiceRef()
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
@parallelProcessing
@executorServiceRef
@parallelProcessing
@executorServiceRef
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
recipientList
parallelProcessing()
executorService()
executorServiceRef()
parallelProcessing()
executorService()
executorServiceRef()
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
@parallelProcessing
@executorServiceRef
@parallelProcessing
@executorServiceRef
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
split
parallelProcessing()
executorService()
executorServiceRef()
parallelProcessing()
executorService()
executorServiceRef()
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
@parallelProcessing
@executorServiceRef
@parallelProcessing
@executorServiceRef
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
threads
executorService()
executorServiceRef()
poolSize()
maxPoolSize()
keepAliveTime()
timeUnit()
maxQueueSize()
rejectedPolicy()
executorService()
executorServiceRef()
poolSize()
maxPoolSize()
keepAliveTime()
timeUnit()
maxQueueSize()
rejectedPolicy()
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
@executorServiceRef
@poolSize
@maxPoolSize
@keepAliveTime
@timeUnit
@maxQueueSize
@rejectedPolicy
@executorServiceRef
@poolSize
@maxPoolSize
@keepAliveTime
@timeUnit
@maxQueueSize
@rejectedPolicy
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
wireTap
wireTap(String uri, ExecutorService executorService)
wireTap(String uri, String executorServiceRef)
wireTap(String uri, ExecutorService executorService)
wireTap(String uri, String executorServiceRef)
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
@executorServiceRef
@executorServiceRef
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
Show more
线程
处理器是一个通用的 DSL 命令,可用于将线程池引入到路由中。它支持以下选项来自定义线程池:
poolSize()
池中的最小线程数量(及初始池大小)。
maxPoolSize()
池中线程的最大数量。
keepAliveTime()
如果任何线程空闲的时间超过这个时间(以秒为单位指定),则终止它们。
timeUnit()
保留生存时间的时间,使用 java.util.concurrent.TimeUnit
类型进行指定。
maxQueueSize()
此线程池可以存储在其传入任务队列中的最大待处理任务数量。
rejectedPolicy()
指定传入任务队列已满时要执行的操作课程。请查看 表 2.10 “线程池构建器选项”
前面的线程池选项与 executorServiceRef
选项 不兼容 (例如:您无法使用这些选项来覆盖由 executorServiceRef
选项引用的线程池中的设置)。Apache Camel 验证 DSL 来执行此操作。
要为其中一个线程感知处理器创建默认线程池,启用 parallelProcessing
选项,使用 Java DSL 中的 parallelProcessing ()
子使用、在 XML DSL 中的 parallelProcessing 属性或 parallelProcessing
属性启用 parallelProcessing 选项。
例如,在 Java DSL 中,您可以使用默认的线程池(虚拟机监控程序同时处理多播目的地)调用多播处理器,如下所示:
from("direct:start")
.multicast().parallelProcessing()
.to("mock:first")
.to("mock:second")
.to("mock:third");
from("direct:start")
.multicast().parallelProcessing()
.to("mock:first")
.to("mock:second")
.to("mock:third");
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
您可以在 XML DSL 中定义相同的路由,如下所示
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="direct:start"/>
<multicast parallelProcessing="true">
<to uri="mock:first"/>
<to uri="mock:second"/>
<to uri="mock:third"/>
</multicast>
</route>
</camelContext>
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="direct:start"/>
<multicast parallelProcessing="true">
<to uri="mock:first"/>
<to uri="mock:second"/>
<to uri="mock:third"/>
</multicast>
</route>
</camelContext>
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
默认线程池由线程工厂自动创建,从 默认的线程池配置文件 中获取其设置。默认线程池配置集在 表 2.9 “默认线程池配置集设置” 中显示设置(假设这些设置没有被应用程序代码修改)。
Expand 表 2.9. 默认线程池配置集设置 线程选项 默认值
maxQueueSize
1000
poolSize
10
maxPoolSize
20
keepAliveTime
60
(秒)
rejectedPolicy
CallerRuns
Show more
可以更改默认线程池配置文件设置,这样将使用自定义设置创建后续的所有默认线程池。您可以在 Java 或 Spring XML 中更改配置集。
例如,在 Java DSL 中,您可以自定义默认线程池配置集中的 poolSize
选项和 maxQueueSize
选项,如下所示:
// Java
import org.apache.camel.spi.ExecutorServiceManager;
import org.apache.camel.spi.ThreadPoolProfile;
...
ExecutorServiceManager manager = context.getExecutorServiceManager();
ThreadPoolProfile defaultProfile = manager.getDefaultThreadPoolProfile();
// Now, customize the profile settings.
defaultProfile.setPoolSize(3);
defaultProfile.setMaxQueueSize(100);
...
// Java
import org.apache.camel.spi.ExecutorServiceManager;
import org.apache.camel.spi.ThreadPoolProfile;
...
ExecutorServiceManager manager = context.getExecutorServiceManager();
ThreadPoolProfile defaultProfile = manager.getDefaultThreadPoolProfile();
// Now, customize the profile settings.
defaultProfile.setPoolSize(3);
defaultProfile.setMaxQueueSize(100);
...
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
在 XML DSL 中,您可以自定义默认线程池配置集,如下所示:
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<threadPoolProfile
id="changedProfile"
defaultProfile="true"
poolSize="3"
maxQueueSize="100"/>
...
</camelContext>
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<threadPoolProfile
id="changedProfile"
defaultProfile="true"
poolSize="3"
maxQueueSize="100"/>
...
</camelContext>
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
请注意,在前面的 XML DSL 示例中将 defaultProfile
属性设置为 true
,否则线程池配置集将被视为自定义线程池配置文件(请参阅 “创建自定义线程池配置集”一节 ),而不是替换默认的线程池配置集。
也可以使用 executorService
或 executorServiceRef
选项来直接为线程识别处理器指定线程池(其中使用了这些选项而不是 并行处理
选项)。您可以使用两种方法来自定义处理器的线程池,如下所示:
指定自定义线程池 WWN - insufficientexplicitly 创建 ExecutorService
(线程池)实例,并将它传递到 executorService
选项。
指定自定义线程池配置集 TOKEN -create 并注册自定义线程池工厂。当您使用 executorServiceRef
选项引用此工厂时,处理器会自动使用 factory 来创建自定义线程池实例。
当您将 bean ID 传递给 executorServiceRef
选项时,线程感知处理器首先会尝试在 registry 中找到具有该 ID 的自定义线程池。如果没有使用该 ID 注册线程池,处理器将尝试在 registry 中查找自定义线程池配置集,并使用自定义线程池配置文件来实例化自定义线程池。
自定义线程池可以是 java.util.concurrent.ExecutorService 类型的任何线程池。在 Apache Camel 中创建线程池实例的以下方法如下:
使用 org.apache.camel.builder.ThreadPoolBuilder
实用程序构建线程池类。
使用当前 CamelContext
中的 org.apache.camel.spi.ExecutorServiceManager
实例来创建线程池类。
最终,这两种方法之间没有区别,因为 ThreadPoolBuilder
实际上使用 ExecutorServiceManager
实例进行定义。通常,使用 ThreadPoolBuilder
是首选的,因为它提供了更简单的方法。但是,至少有一个线程( ScheduledExecutorService
)仅可通过直接访问 ExecutorServiceManager
实例来创建。
表 2.10 “线程池构建器选项” 显示 ThreadPoolBuilder
类支持的选项,您可以在定义新的自定义线程池时设置这些选项。
Expand 表 2.10. 线程池构建器选项 构建器选项 描述
maxQueueSize()
设置此线程池可以在其传入任务队列中存储的最大待处理任务数量。值 -1
指定未绑定的队列。默认值取自默认的线程池配置集。
poolSize()
设置池中最小线程数量(这也是初始池大小)。默认值取自默认的线程池配置集。
maxPoolSize()
设置池中可以的最大线程数。默认值取自默认的线程池配置集。
keepAliveTime()
如果任何线程空闲的时间超过这个时间(以秒为单位指定),则终止它们。这允许线程池在负载较轻时缩小。默认值取自默认的线程池配置集。
rejectedPolicy()
指定传入任务队列已满时要执行的操作课程。您可以指定四个可能的值:
CallerRuns
(默认值) 获取 caller 线程来运行最新的传入任务。作为副作用,此选项可防止 caller 线程接收任何更多任务,直到完成完成最新的传入任务。
Abort
通过抛出异常来中止最新的传入任务。
丢弃
以静默方式丢弃最新的传入任务。
DiscardOldest
丢弃最旧的未处理的任务,然后尝试对任务队列中的最新传入任务进行排队。
build()
完成构建自定义线程池,并在指定为 build ()
参数的 ID 下注册新的线程池。
Show more
在 Java DSL 中,您可以使用 ThreadPoolBuilder
定义自定义线程池,如下所示:
// Java
import org.apache.camel.builder.ThreadPoolBuilder;
import java.util.concurrent.ExecutorService;
...
ThreadPoolBuilder poolBuilder = new ThreadPoolBuilder(context);
ExecutorService customPool = poolBuilder.poolSize(5).maxPoolSize(5).maxQueueSize(100).build("customPool");
...
from("direct:start")
.multicast().executorService(customPool)
.to("mock:first")
.to("mock:second")
.to("mock:third");
// Java
import org.apache.camel.builder.ThreadPoolBuilder;
import java.util.concurrent.ExecutorService;
...
ThreadPoolBuilder poolBuilder = new ThreadPoolBuilder(context);
ExecutorService customPool = poolBuilder.poolSize(5).maxPoolSize(5).maxQueueSize(100).build("customPool");
...
from("direct:start")
.multicast().executorService(customPool)
.to("mock:first")
.to("mock:second")
.to("mock:third");
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
除了直接将 beutorService ()选项直接传递给 executorService ()
选项,您可以查询 registry 中的线程池,而是将其 bean ID 传递给 executorServiceRef ()
选项,如下所示:
// Java
from("direct:start")
.multicast().executorServiceRef("customPool")
.to("mock:first")
.to("mock:second")
.to("mock:third");
// Java
from("direct:start")
.multicast().executorServiceRef("customPool")
.to("mock:first")
.to("mock:second")
.to("mock:third");
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
在 XML DSL 中,您可以使用 threadPool
元素访问 ThreadPoolBuilder
。然后,您可以使用 executorServiceRef
属性引用自定义线程池,根据 Spring registry 中的 ID 查找线程池,如下所示:
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<threadPool id="customPool"
poolSize="5"
maxPoolSize="5"
maxQueueSize="100" />
<route>
<from uri="direct:start"/>
<multicast executorServiceRef="customPool">
<to uri="mock:first"/>
<to uri="mock:second"/>
<to uri="mock:third"/>
</multicast>
</route>
</camelContext>
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<threadPool id="customPool"
poolSize="5"
maxPoolSize="5"
maxQueueSize="100" />
<route>
<from uri="direct:start"/>
<multicast executorServiceRef="customPool">
<to uri="mock:first"/>
<to uri="mock:second"/>
<to uri="mock:third"/>
</multicast>
</route>
</camelContext>
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
如果您有多个要创建的自定义线程池实例,您可能会更方便地定义自定义线程池配置文件,它充当线程池的工厂。每当您从线程感知处理器引用线程池配置集时,处理器会自动使用该配置集来创建新的线程池实例。您可以在 Java DSL 或 XML DSL 中定义自定义线程池配置集。
例如,在 Java DSL 中,您可以使用 bean ID、CustomProfile
并在路由中引用它,如下所示:
// Java
import org.apache.camel.spi.ThreadPoolProfile;
import org.apache.camel.impl.ThreadPoolProfileSupport;
...
// Create the custom thread pool profile
ThreadPoolProfile customProfile = new ThreadPoolProfileSupport("customProfile");
customProfile.setPoolSize(5);
customProfile.setMaxPoolSize(5);
customProfile.setMaxQueueSize(100);
context.getExecutorServiceManager().registerThreadPoolProfile(customProfile);
...
// Reference the custom thread pool profile in a route
from("direct:start")
.multicast().executorServiceRef("customProfile")
.to("mock:first")
.to("mock:second")
.to("mock:third");
// Java
import org.apache.camel.spi.ThreadPoolProfile;
import org.apache.camel.impl.ThreadPoolProfileSupport;
...
// Create the custom thread pool profile
ThreadPoolProfile customProfile = new ThreadPoolProfileSupport("customProfile");
customProfile.setPoolSize(5);
customProfile.setMaxPoolSize(5);
customProfile.setMaxQueueSize(100);
context.getExecutorServiceManager().registerThreadPoolProfile(customProfile);
...
// Reference the custom thread pool profile in a route
from("direct:start")
.multicast().executorServiceRef("customProfile")
.to("mock:first")
.to("mock:second")
.to("mock:third");
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
在 XML DSL 中,使用 threadPoolProfile
元素创建一个自定义池配置文件(您可让 defaultProfile
选项默认为 false
),因为这不是默认 的线程池配置文件。您可以使用 bean ID、customProfile
并在路由中引用它,如下所示:
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<threadPoolProfile
id="customProfile"
poolSize="5"
maxPoolSize="5"
maxQueueSize="100" />
<route>
<from uri="direct:start"/>
<multicast executorServiceRef="customProfile">
<to uri="mock:first"/>
<to uri="mock:second"/>
<to uri="mock:third"/>
</multicast>
</route>
</camelContext>
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<threadPoolProfile
id="customProfile"
poolSize="5"
maxPoolSize="5"
maxQueueSize="100" />
<route>
<from uri="direct:start"/>
<multicast executorServiceRef="customProfile">
<to uri="mock:first"/>
<to uri="mock:second"/>
<to uri="mock:third"/>
</multicast>
</route>
</camelContext>
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
要使应用日志更易阅读,通常最好自定义线程名称(用于识别日志中的线程)。要自定义线程名称,您可以在 ExecutorServiceStrategy
类或 ExecutorServiceManager
类上调用 setThreadNamePattern
方法来配置线程名称 模式 。或者,设置线程名称模式的一种简单方法是设置 CamelContext
对象上的 threadNamePattern
属性。
以下占位符可以在线程名称模式中使用:
#camelId#
当前 CamelContext
的名称。
#counter#
唯一的线程标识符,作为递增计数器实施。
#name#
常规 Camel 线程名称。
#longName#
较长的线程名称在相同时间上可以包括端点参数,以此类推。
以下是线程名称模式的典型示例:
Camel (#camelId#) thread #counter# - #name#
Camel (#camelId#) thread #counter# - #name#
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
以下示例演示了如何使用 XML DSL 在 Camel 上下文上设置 threadNamePattern
属性:
<camelContext xmlns="http://camel.apache.org/schema/spring"
threadNamePattern="Riding the thread #counter#" >
<route>
<from uri="seda:start"/>
<to uri="log:result"/>
<to uri="mock:result"/>
</route>
</camelContext>
<camelContext xmlns="http://camel.apache.org/schema/spring"
threadNamePattern="Riding the thread #counter#" >
<route>
<from uri="seda:start"/>
<to uri="log:result"/>
<to uri="mock:result"/>
</route>
</camelContext>
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow