2.8. 线程模型


Java 线程池 API

Apache Camel 线程模型基于强大的 Java 并发 API (package java.util.concurrent ),它最初在 Sun 的 JDK 1.5 中提供。此 API 中的关键接口是 ExecutorService 接口,代表线程池。使用 concurrency API,您可以创建许多不同类型的线程池,覆盖各种场景。

Apache Camel 线程池 API

Apache Camel 线程池 API 通过为 Apache Camel 应用程序中的所有线程池提供中央工厂( org.apache.camel.spi.ExecutorServiceManager 类型)。以这种方式创建线程池提供了几个优点,包括:

  • 使用实用程序类简化线程池的创建。
  • 将线程池与安全关闭集成.
  • 线程自动给定信息名称,这对于日志记录和管理很有用。

组件线程模型

有些 Apache Camel 组件 swig-wagon (如 SEDA、JMS 和 Jetty)都包括在多线程中。这些组件已使用 Apache Camel 线程模型和线程池 API 实施。

如果您计划实施自己的 Apache Camel 组件,建议您将线程代码与 Apache Camel 线程模型集成。例如,如果您的组件需要线程池,建议您使用 CamelContext 的 ExecutorServiceManager 对象创建它。

处理器线程模型

默认情况下,Apache Camel 中的一些标准处理器创建自己的线程池。这些线程感知处理器也与 Apache Camel 线程模型集成,它们提供了各种选项,供您自定义它们使用的线程池。

表 2.8 “处理器线程选项” 显示在线程感知处理器内置到 Apache Camel 上控制和设置线程池的各种选项。

表 2.8. 处理器线程选项
处理器Java DSLXML DSL

聚合

parallelProcessing()
executorService()
executorServiceRef()
@parallelProcessing
@executorServiceRef

multicast

parallelProcessing()
executorService()
executorServiceRef()
@parallelProcessing
@executorServiceRef

recipientList

parallelProcessing()
executorService()
executorServiceRef()
@parallelProcessing
@executorServiceRef

split

parallelProcessing()
executorService()
executorServiceRef()
@parallelProcessing
@executorServiceRef

threads

executorService()
executorServiceRef()
poolSize()
maxPoolSize()
keepAliveTime()
timeUnit()
maxQueueSize()
rejectedPolicy()
@executorServiceRef
@poolSize
@maxPoolSize
@keepAliveTime
@timeUnit
@maxQueueSize
@rejectedPolicy

wireTap

wireTap(String uri, ExecutorService executorService)
wireTap(String uri, String executorServiceRef)
@executorServiceRef

线程 DSL 选项

线程 处理器是一个通用的 DSL 命令,可用于将线程池引入到路由中。它支持以下选项来自定义线程池:

poolSize()
池中的最小线程数量(和初始池大小)。
maxPoolSize()
池中的最大线程数。
keepAliveTime()
如果有任何线程闲置时间超过这一时间段(以秒为单位指定),则它们将被终止。
timeUnit()
keep alive 的时间单位,使用 java.util.concurrent.TimeUnit 类型指定。
maxQueueSize()
此线程池可以存储在其传入任务队列中的最大待处理任务数量。
rejectedPolicy()
指定在传入的任务队列已满时要执行的操作。请查看 表 2.10 “线程池构建器选项”
注意

前面的线程池选项与 executorServiceRef 选项 不兼容 (例如,您无法使用这些选项覆盖 executorServiceRef 选项引用的线程池中的设置)。Apache Camel 验证 DSL 以强制执行这一点。

创建默认线程池

要为其中一个线程感知处理器创建一个默认线程池,请在 XML DSL 中使用 parallelProcessing () 子类别、Java DSL 或 parallelProcessing 属性启用 parallelProcessing 选项。

例如,在 Java DSL 中,您可以调用带有默认线程池(其中线程池用于同时处理多播目的地)的多播处理器,如下所示:

from("direct:start")
  .multicast().parallelProcessing()
    .to("mock:first")
    .to("mock:second")
    .to("mock:third");

您可以在 XML DSL 中定义相同的路由,如下所示

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
    <route>
        <from uri="direct:start"/>
        <multicast parallelProcessing="true">
            <to uri="mock:first"/>
            <to uri="mock:second"/>
            <to uri="mock:third"/>
        </multicast>
    </route>
</camelContext>

默认线程池配置集设置

默认线程池由线程工厂自动创建,它从 默认线程池配置集 获取其设置。默认线程池配置集有 表 2.9 “默认线程池配置文件设置” 中显示的设置(假设应用程序代码没有修改这些设置)。

表 2.9. 默认线程池配置文件设置
线程选项默认值

maxQueueSize

1000

poolSize

10

maxPoolSize

20

keepAliveTime

60 (秒)

rejectedPolicy

CallerRuns

更改默认线程池配置集

可以更改默认的线程池配置文件设置,以便使用自定义设置创建所有后续默认线程池。您可以在 Java 或 Spring XML 中更改配置集。

例如,在 Java DSL 中,您可以自定义 default 线程池配置文件中的 poolSize 选项和 maxQueueSize 选项,如下所示:

// Java
import org.apache.camel.spi.ExecutorServiceManager;
import org.apache.camel.spi.ThreadPoolProfile;
...
ExecutorServiceManager manager = context.getExecutorServiceManager();
ThreadPoolProfile defaultProfile = manager.getDefaultThreadPoolProfile();

// Now, customize the profile settings.
defaultProfile.setPoolSize(3);
defaultProfile.setMaxQueueSize(100);
...

在 XML DSL 中,您可以自定义默认线程池配置集,如下所示:

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
    <threadPoolProfile
        id="changedProfile"
        defaultProfile="true"
        poolSize="3"
        maxQueueSize="100"/>
    ...
</camelContext>

请注意,在前面的 XML DSL 示例中将 defaultProfile 属性设置为 true 非常重要,否则线程池配置集将被视为自定义线程池配置集(请参阅 “创建自定义线程池配置集”一节),而不是替换默认线程池配置集。

自定义处理器的线程池

也可以使用 executorServiceexecutorServiceRef 选项(其中使用这些选项而不是 parallelProcessing 选项)为线程感知处理器指定线程池。您可以使用两种方法来自定义处理器的线程池,如下所示:

  • 指定自定义线程池 HEKETI explicitly 创建 ExecutorService (线程池)实例,并将其传递给 executorService 选项。
  • 指定自定义线程池配置集 wagon-wagoncreate 并注册自定义线程池工厂。当您使用 executorServiceRef 选项引用这个工厂时,处理器会自动使用工厂创建自定义线程池实例。

当您将 bean ID 传递给 executorServiceRef 选项时,线程感知处理器首先尝试在 registry 中找到具有该 ID 的自定义线程池。如果没有使用该 ID 注册线程池,则处理器会尝试在 registry 中查找自定义线程池配置集,并使用自定义线程池配置集实例化自定义线程池。

创建自定义线程池

自定义线程池可以是 java.util.concurrent.ExecutorService 类型的任何线程池。Apache Camel 中建议使用以下创建线程池实例的方法:

  • 使用 org.apache.camel.builder.ThreadPoolBuilder 实用程序构建线程池类。
  • 使用当前 CamelContext 中的 org.apache.camel.spi.ExecutorServiceManager 实例来创建线程池类。

最终,这两种方法之间没有区别,因为 ThreadPoolBuilder 实际上使用 ExecutorServiceManager 实例定义。通常,首选 ThreadPoolBuilder,因为它提供了一种更简单的方法。但至少有一个线程类型( ScheduledExecutorService),它们只能通过直接访问 ExecutorServiceManager 实例来创建。

表 2.10 “线程池构建器选项” 显示 ThreadPoolBuilder 类支持的选项,您可以在定义新的自定义线程池时设置。

表 2.10. 线程池构建器选项
构建器选项描述

maxQueueSize()

设置此线程池可在其传入任务队列中存储的最大待处理任务数。值 -1 指定未绑定队列。默认值取自默认线程池配置文件。

poolSize()

设置池中的最小线程数量(这也是初始池大小)。默认值取自默认线程池配置文件。

maxPoolSize()

设置池中可以的最大线程数。默认值取自默认线程池配置文件。

keepAliveTime()

如果有任何线程闲置时间超过这一时间段(以秒为单位指定),则它们将被终止。这允许线程池在负载比较轻时缩小。默认值取自默认线程池配置文件。

rejectedPolicy()

指定在传入的任务队列已满时要执行的操作。您可以指定四个可能的值:

CallerRuns
(默认值) 获取调用者线程来运行最新的传入的任务。作为副作用,此选项可防止调用者线程收到任何其他任务,直到它完成处理最新的传入的任务。
Abort
通过抛出异常中止最新的传入的任务。
discard
以静默方式丢弃最新的传入的任务。
DiscardOldest
丢弃最旧的未处理的任务,然后尝试在任务队列中排队最新的传入的任务。

build()

完成构建自定义线程池,并在指定为 build () 参数的 ID 下注册新的线程池。

在 Java DSL 中,您可以使用 ThreadPoolBuilder 定义自定义线程池,如下所示:

// Java
import org.apache.camel.builder.ThreadPoolBuilder;
import java.util.concurrent.ExecutorService;
...
ThreadPoolBuilder poolBuilder = new ThreadPoolBuilder(context);
ExecutorService customPool = poolBuilder.poolSize(5).maxPoolSize(5).maxQueueSize(100).build("customPool");
...

from("direct:start")
  .multicast().executorService(customPool)
    .to("mock:first")
    .to("mock:second")
    .to("mock:third");

您可以通过将 bean ID 传递给 executorService Ref () 选项,而不是直接将对象引用传给 executorService () 选项,而是在 registry 中查找线程池,如下所示:

// Java
from("direct:start")
  .multicast().executorServiceRef("customPool")
    .to("mock:first")
    .to("mock:second")
    .to("mock:third");

在 XML DSL 中,您可以使用 threadPool 元素访问 ThreadPoolBuilder。然后,您可以使用 executorServiceRef 属性引用自定义线程池,根据 Spring registry 中的 ID 查找线程池,如下所示:

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
    <threadPool id="customPool"
                poolSize="5"
                maxPoolSize="5"
                maxQueueSize="100" />

    <route>
        <from uri="direct:start"/>
        <multicast executorServiceRef="customPool">
            <to uri="mock:first"/>
            <to uri="mock:second"/>
            <to uri="mock:third"/>
        </multicast>
    </route>
</camelContext>

创建自定义线程池配置集

如果您有很多自定义线程池实例,您可能会发现定义自定义线程池配置集更为方便,它充当线程池的工厂。每当从线程感知处理器引用线程池配置集时,处理器会自动使用配置集创建新线程池实例。您可以在 Java DSL 或 XML DSL 中定义自定义线程池配置集。

例如,在 Java DSL 中,您可以使用 bean ID customProfile 创建自定义线程池配置集,并从路由内引用它,如下所示:

// Java
import org.apache.camel.spi.ThreadPoolProfile;
import org.apache.camel.impl.ThreadPoolProfileSupport;
...
// Create the custom thread pool profile
ThreadPoolProfile customProfile = new ThreadPoolProfileSupport("customProfile");
customProfile.setPoolSize(5);
customProfile.setMaxPoolSize(5);
customProfile.setMaxQueueSize(100);
context.getExecutorServiceManager().registerThreadPoolProfile(customProfile);
...
// Reference the custom thread pool profile in a route
from("direct:start")
  .multicast().executorServiceRef("customProfile")
    .to("mock:first")
    .to("mock:second")
    .to("mock:third");

在 XML DSL 中,使用 threadPoolProfile 元素创建自定义池配置集(您可以在其中让 defaultProfile 选项默认为 false,因为这不是默认的线程池配置集)。您可以使用 bean ID, customProfile 创建自定义线程池配置集,并从路由内引用它,如下所示:

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
    <threadPoolProfile
                id="customProfile"
                poolSize="5"
                maxPoolSize="5"
                maxQueueSize="100" />

    <route>
        <from uri="direct:start"/>
        <multicast executorServiceRef="customProfile">
            <to uri="mock:first"/>
            <to uri="mock:second"/>
            <to uri="mock:third"/>
        </multicast>
    </route>
</camelContext>

在组件间共享线程池

某些标准的基于 poll 的组件都基于 poll-wagon,如 File 和 FTP swig-wagonallow,您可以指定要使用的线程池。这使得不同的组件可以共享同一线程池,从而减少 JVM 中线程的整体数量。

例如,Apache Camel 组件参考指南 中的 File2Apache Camel 组件参考指南 中的 Ftp2 都公开 scheduledExecutorService 属性,您可以使用它来指定组件的 ExecutorService 对象。

自定义线程名称

要使应用日志更易读,通常最好自定义线程名称(用于标识日志中线程)。要自定义线程名称,您可以通过在 ExecutorServiceStrategy 类或 ExecutorServiceManager 类上调用 setThreadNamePattern 方法来配置线程名称 模式。另外,设置线程名称模式的一种更容易方法是在 CamelContext 对象中设置 threadNamePattern 属性。

以下占位符可以在线程名称模式中使用:

#camelId#
当前 CamelContext 的名称。
#counter failing
唯一线程标识符,作为递增计数器实施。
#name#
常规 Camel 线程名称。
#longName#
较长的线程名称可以包括端点参数,以此类推。

以下是线程名称模式的典型示例:

Camel (#camelId#) thread #counter# - #name#

以下示例演示了如何使用 XML DSL 在 Camel 上下文上设置 threadNamePattern 属性:

<camelContext xmlns="http://camel.apache.org/schema/spring"
              threadNamePattern="Riding the thread #counter#" >
  <route>
    <from uri="seda:start"/>
    <to uri="log:result"/>
    <to uri="mock:result"/>
  </route>
</camelContext>
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.