2.8. Threading Model
Java thread pool API
The Apache Camel threading model is based on the powerful Java concurrency API, Package java.util.concurrent, that first became available in Sun's JDK 1.5. The key interface in this API is the
ExecutorService
interface, which represents a thread pool. Using the concurrency API, you can create many different kinds of thread pool, covering a wide range of scenarios.
Apache Camel thread pool API
The Apache Camel thread pool API builds on the Java concurrency API by providing a central factory (of
org.apache.camel.spi.ExecutorServiceManager
type) for all of the thread pools in your Apache Camel application. Centralising the creation of thread pools in this way provides several advantages, including:
- Simplified creation of thread pools, using utility classes.
- Integrating thread pools with graceful shutdown.
- Threads automatically given informative names, which is beneficial for logging and management.
Component threading model
Some Apache Camel components—such as SEDA, JMS, and Jetty—are inherently multi-threaded. These components have all been implemented using the Apache Camel threading model and thread pool API.
If you are planning to implement your own Apache Camel component, it is recommended that you integrate your threading code with the Apache Camel threading model. For example, if your component needs a thread pool, it is recommended that you create it using the CamelContext's
ExecutorServiceManager
object.
Processor threading model
Some of the standard processors in Apache Camel create their own thread pool by default. These threading-aware processors are also integrated with the Apache Camel threading model and they provide various options that enable you to customize the thread pools that they use.
Table 2.8, “Processor Threading Options” shows the various options for controlling and setting thread pools on the threading-aware processors built-in to Apache Camel.
Processor | Java DSL | XML DSL |
---|---|---|
aggregate |
parallelProcessing() executorService() executorServiceRef() |
@parallelProcessing @executorServiceRef |
multicast |
parallelProcessing() executorService() executorServiceRef() |
@parallelProcessing @executorServiceRef |
recipientList |
parallelProcessing() executorService() executorServiceRef() |
@parallelProcessing @executorServiceRef |
split |
parallelProcessing() executorService() executorServiceRef() |
@parallelProcessing @executorServiceRef |
threads |
executorService() executorServiceRef() poolSize() maxPoolSize() keepAliveTime() timeUnit() maxQueueSize() rejectedPolicy() |
@executorServiceRef @poolSize @maxPoolSize @keepAliveTime @timeUnit @maxQueueSize @rejectedPolicy |
wireTap |
wireTap(String uri, ExecutorService executorService) wireTap(String uri, String executorServiceRef) |
@executorServiceRef |
threads DSL options
The
threads
processor is a general-purpose DSL command, which you can use to introduce a thread pool into a route. It supports the following options to customize the thread pool:
poolSize()
- Minimum number of threads in the pool (and initial pool size).
maxPoolSize()
- Maximum number of threads in the pool.
keepAliveTime()
- If any threads are idle for longer than this period of time (specified in seconds), they are terminated.
timeUnit()
- Time unit for keep alive, specified using the
java.util.concurrent.TimeUnit
type. maxQueueSize()
- Maximum number of pending tasks that this thread pool can store in its incoming task queue.
rejectedPolicy()
- Specifies what course of action to take, if the incoming task queue is full. See Table 2.10, “Thread Pool Builder Options”
Note
The preceding thread pool options are not compatible with the
executorServiceRef
option (for example, you cannot use these options to override the settings in the thread pool referenced by an executorServiceRef
option). Apache Camel validates the DSL to enforce this.
Creating a default thread pool
To create a default thread pool for one of the threading-aware processors, enable the
parallelProcessing
option, using the parallelProcessing()
sub-clause, in the Java DSL, or the parallelProcessing
attribute, in the XML DSL.
For example, in the Java DSL, you can invoke the multicast processor with a default thread pool (where the thread pool is used to process the multicast destinations concurrently) as follows:
from("direct:start") .multicast().parallelProcessing() .to("mock:first") .to("mock:second") .to("mock:third");
You can define the same route in XML DSL as follows
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <multicast parallelProcessing="true"> <to uri="mock:first"/> <to uri="mock:second"/> <to uri="mock:third"/> </multicast> </route> </camelContext>
Default thread pool profile settings
The default thread pools are automatically created by a thread factory that takes its settings from the default thread pool profile. The default thread pool profile has the settings shown in Table 2.9, “Default Thread Pool Profile Settings” (assuming that these settings have not been modified by the application code).
Thread Option | Default Value |
---|---|
maxQueueSize | 1000 |
poolSize | 10 |
maxPoolSize | 20 |
keepAliveTime | 60 (seconds) |
rejectedPolicy | CallerRuns |
Changing the default thread pool profile
It is possible to change the default thread pool profile settings, so that all subsequent default thread pools will be created with the custom settings. You can change the profile either in Java or in Spring XML.
For example, in the Java DSL, you can customize the
poolSize
option and the maxQueueSize
option in the default thread pool profile, as follows:
// Java import org.apache.camel.spi.ExecutorServiceManager; import org.apache.camel.spi.ThreadPoolProfile; ... ExecutorServiceManager manager = context.getExecutorServiceManager(); ThreadPoolProfile defaultProfile = manager.getDefaultThreadPoolProfile(); // Now, customize the profile settings. defaultProfile.setPoolSize(3); defaultProfile.setMaxQueueSize(100); ...
In the XML DSL, you can customize the default thread pool profile, as follows:
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<threadPoolProfile
id="changedProfile"
defaultProfile="true"
poolSize="3"
maxQueueSize="100"/>
...
</camelContext>
Note that it is essential to set the
defaultProfile
attribute to true
in the preceding XML DSL example, otherwise the thread pool profile would be treated like a custom thread pool profile (see the section called “Creating a custom thread pool profile”), instead of replacing the default thread pool profile.
Customizing a processor's thread pool
It is also possible to specify the thread pool for a threading-aware processor more directly, using either the
executorService
or executorServiceRef
options (where these options are used instead of the parallelProcessing
option). There are two approaches you can use to customize a processor's thread pool, as follows:
- Specify a custom thread pool—explicitly create an
ExecutorService
(thread pool) instance and pass it to theexecutorService
option. - Specify a custom thread pool profile—create and register a custom thread pool factory. When you reference this factory using the
executorServiceRef
option, the processor automatically uses the factory to create a custom thread pool instance.
When you pass a bean ID to the
executorServiceRef
option, the threading-aware processor first tries to find a custom thread pool with that ID in the registry. If no thread pool is registered with that ID, the processor then attempts to look up a custom thread pool profile in the registry and uses the custom thread pool profile to instantiate a custom thread pool.
Creating a custom thread pool
A custom thread pool can be any thread pool of java.util.concurrent.ExecutorService type. The following approaches to creating a thread pool instance are recommended in Apache Camel:
- Use the
org.apache.camel.builder.ThreadPoolBuilder
utility to build the thread pool class. - Use the
org.apache.camel.spi.ExecutorServiceManager
instance from the currentCamelContext
to create the thread pool class.
Ultimately, there is not much difference between the two approaches, because the
ThreadPoolBuilder
is actually defined using the ExecutorServiceManager
instance. Normally, the ThreadPoolBuilder
is preferred, because it offers a simpler approach. But there is at least one kind of thread (the ScheduledExecutorService
) that can only be created by accessing the ExecutorServiceManager
instance directory.
Table 2.10, “Thread Pool Builder Options” shows the options supported by the
ThreadPoolBuilder
class, which you can set when defining a new custom thread pool.
Builder Option | Description |
---|---|
maxQueueSize() | Sets the maximum number of pending tasks that this thread pool can store in its incoming task queue. A value of -1 specifies an unbounded queue. Default value is taken from default thread pool profile. |
poolSize() | Sets the minimum number of threads in the pool (this is also the initial pool size). Default value is taken from default thread pool profile. |
maxPoolSize() | Sets the maximum number of threads that can be in the pool. Default value is taken from default thread pool profile. |
keepAliveTime() | If any threads are idle for longer than this period of time (specified in seconds), they are terminated. This allows the thread pool to shrink when the load is light. Default value is taken from default thread pool profile. |
rejectedPolicy() |
Specifies what course of action to take, if the incoming task queue is full. You can specify four possible values:
|
build() | Finishes building the custom thread pool and registers the new thread pool under the ID specified as the argument to build() . |
In Java DSL, you can define a custom thread pool using the
ThreadPoolBuilder
, as follows:
// Java import org.apache.camel.builder.ThreadPoolBuilder; import java.util.concurrent.ExecutorService; ... ThreadPoolBuilder poolBuilder = new ThreadPoolBuilder(context); ExecutorService customPool = poolBuilder.poolSize(5).maxPoolSize(5).maxQueueSize(100).build("customPool"); ... from("direct:start") .multicast().executorService(customPool) .to("mock:first") .to("mock:second") .to("mock:third");
Instead of passing the object reference,
customPool
, directly to the executorService()
option, you can look up the thread pool in the registry, by passing its bean ID to the executorServiceRef()
option, as follows:
// Java from("direct:start") .multicast().executorServiceRef("customPool") .to("mock:first") .to("mock:second") .to("mock:third");
In XML DSL, you access the
ThreadPoolBuilder
using the threadPool
element. You can then reference the custom thread pool using the executorServiceRef
attribute to look up the thread pool by ID in the Spring registry, as follows:
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <threadPool id="customPool" poolSize="5" maxPoolSize="5" maxQueueSize="100" /> <route> <from uri="direct:start"/> <multicast executorServiceRef="customPool"> <to uri="mock:first"/> <to uri="mock:second"/> <to uri="mock:third"/> </multicast> </route> </camelContext>
Creating a custom thread pool profile
If you have many custom thread pool instances to create, you might find it more convenient to define a custom thread pool profile, which acts as a factory for thread pools. Whenever you reference a thread pool profile from a threading-aware processor, the processor automatically uses the profile to create a new thread pool instance. You can define a custom thread pool profile either in Java DSL or in XML DSL.
For example, in Java DSL you can create a custom thread pool profile with the bean ID,
customProfile
, and reference it from within a route, as follows:
// Java import org.apache.camel.spi.ThreadPoolProfile; import org.apache.camel.impl.ThreadPoolProfileSupport; ... // Create the custom thread pool profile ThreadPoolProfile customProfile = new ThreadPoolProfileSupport("customProfile"); customProfile.setPoolSize(5); customProfile.setMaxPoolSize(5); customProfile.setMaxQueueSize(100); context.getExecutorServiceManager().registerThreadPoolProfile(customProfile); ... // Reference the custom thread pool profile in a route from("direct:start") .multicast().executorServiceRef("customProfile") .to("mock:first") .to("mock:second") .to("mock:third");
In XML DSL, use the
threadPoolProfile
element to create a custom pool profile (where you let the defaultProfile
option default to false
, because this is not a default thread pool profile). You can create a custom thread pool profile with the bean ID, customProfile
, and reference it from within a route, as follows:
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <threadPoolProfile id="customProfile" poolSize="5" maxPoolSize="5" maxQueueSize="100" /> <route> <from uri="direct:start"/> <multicast executorServiceRef="customProfile"> <to uri="mock:first"/> <to uri="mock:second"/> <to uri="mock:third"/> </multicast> </route> </camelContext>
Sharing a thread pool between components
Some of the standard poll-based components—such as File and FTP—allow you to specify the thread pool to use. This makes it possible for different components to share the same thread pool, reducing the overall number of threads in the JVM.
Customizing thread names
To make the application logs more readable, it is often a good idea to customize the thread names (which are used to identify threads in the log). To customize thread names, you can configure the thread name pattern by calling the
setThreadNamePattern
method on the ExecutorServiceStrategy
class or the ExecutorServiceManager
class. Alternatively, an easier way to set the thread name pattern is to set the threadNamePattern
property on the CamelContext
object.
The following placeholders can be used in a thread name pattern:
#camelId#
- The name of the current
CamelContext
. #counter#
- A unique thread identifier, implemented as an incrementing counter.
#name#
- The regular Camel thread name.
#longName#
- The long thread name—which can include endpoint parameters and so on.
The following is a typical example of a thread name pattern:
Camel (#camelId#) thread #counter# - #name#
The following example shows how to set the
threadNamePattern
attribute on a Camel context using XML DSL:
<camelContext xmlns="http://camel.apache.org/schema/spring"
threadNamePattern="Riding the thread #counter#" >
<route>
<from uri="seda:start"/>
<to uri="log:result"/>
<to uri="mock:result"/>
</route>
</camelContext>