2.8. スレッドモデル
Java スレッドプール API
Apache Camel のスレッドモデルは、強力な Java 並行処理 API パッケージ java.util.concurrent に基づいています。この API は、Sun の JDK 1.5 で初めて利用可能になったものです。この API の主要なインターフェースは、スレッドプールを表す ExecutorService
インターフェースです。並行処理 API を使用すると、幅広いシナリオに対応する、さまざまな種類のスレッドプールを作成できます。
Apache Camel スレッドプール API
Apache Camel スレッドプール API は Java 並行処理 API 上で構築されており、Apache Camel アプリケーションのすべてのスレッドプールに対して中心的なファクトリー (org.apache.camel.spi.ExecutorServiceManager
型) が提供されます。このような方法でスレッドプールの作成を一元化することには、以下のようにいくつかの利点があります。
- ユーティリティークラスを使用することで、スレッドプールの作成を簡素化できる。
- スレッドプールを正常なシャットダウンに統合できる。
- スレッドに有益な名前を自動的に付与できる。これは、ロギングと管理に役立ちます。
コンポーネントのスレッドモデル
SEDA、JMS、Jetty など、Apache Camel コンポーネントには本質的にマルチスレッドなものがあります。こうしたコンポーネントはすべて Apache Camel のスレッドモデルとスレッドプール API を使用して実装されています。
独自の Apache Camel コンポーネントを実装しようとしている場合、マルチスレッドなコードは Apache Camel のスレッドモデルと統合することが推奨されます。たとえば、コンポーネントにスレッドプールが必要な場合は、CamelContext の ExecutorServiceManager
オブジェクトを使用して作成することが推奨されます。
プロセッサーのスレッドモデル
Apache Camel の標準プロセッサーの中には、デフォルトで独自のスレッドプールを作成するものがあります。これらのスレッド対応プロセッサーも Apache Camel のスレッドモデルと統合されており、使用するスレッドプールのカスタマイズを可能にするさまざまなオプションを提供しています。
表2.8「プロセッサーのマルチスレッドオプション」 では、Apache Camel に組み込まれているスレッド対応のプロセッサーでスレッドプールを制御および設定するためのさまざまなオプションを示しています。
プロセッサー | Java DSL | XML DSL |
---|---|---|
|
parallelProcessing() executorService() executorServiceRef() |
@parallelProcessing @executorServiceRef |
|
parallelProcessing() executorService() executorServiceRef() |
@parallelProcessing @executorServiceRef |
|
parallelProcessing() executorService() executorServiceRef() |
@parallelProcessing @executorServiceRef |
|
parallelProcessing() executorService() executorServiceRef() |
@parallelProcessing @executorServiceRef |
|
executorService() executorServiceRef() poolSize() maxPoolSize() keepAliveTime() timeUnit() maxQueueSize() rejectedPolicy() |
@executorServiceRef @poolSize @maxPoolSize @keepAliveTime @timeUnit @maxQueueSize @rejectedPolicy |
|
wireTap(String uri, ExecutorService executorService) wireTap(String uri, String executorServiceRef) |
@executorServiceRef |
threads DSL オプション
threads
プロセッサーは汎用の DSL コマンドであり、ルートにスレッドプールを導入するために使用できます。スレッドプールをカスタマイズするために、以下のオプションをサポートしています。
poolSize()
- プールの最小スレッド数 (および初期プールサイズ)。
maxPoolSize()
- プールの最大スレッド数。
keepAliveTime()
- スレッドがこの期間 (秒単位で指定) よりも長い間アイドル状態になっている場合、スレッドを終了させる。
timeUnit()
-
keep alive の時間単位。
java.util.concurrent.TimeUnit
タイプを使用して指定します。 maxQueueSize()
- このスレッドプールが受信タスクキューに保持できる保留中の最大タスク数。
rejectedPolicy()
- 受信タスクキューが満杯の場合に実行すべきアクションを指定する。表2.10「スレッドプールビルダーのオプション」 を参照してください。
前述のスレッドプールのオプションは、executorServiceRef
オプションと互換性が ありません (たとえば、これらのオプションを使用して、executorServiceRef
オプションで参照されるスレッドプールの設定を上書きすることはできません)。この DSL の制約は Apache Camel が検証することで強制されます。
デフォルトのスレッドプールの作成
スレッド対応プロセッサーに対してデフォルトのスレッドプールを作成するには、parallelProcessing
オプションを有効にします。Java DSL では parallelProcessing()
サブ句を、XML DSL では parallelProcessing
属性を使用します。
たとえば、Java DSL では、以下のようにデフォルトのスレッドプールを使用して multicast プロセッサーを呼び出すことができます (スレッドプールはマルチキャストの宛先を同時に処理するために使用されます)。
from("direct:start") .multicast().parallelProcessing() .to("mock:first") .to("mock:second") .to("mock:third");
以下のように XML DSL で同じルートを定義できます。
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <multicast parallelProcessing="true"> <to uri="mock:first"/> <to uri="mock:second"/> <to uri="mock:third"/> </multicast> </route> </camelContext>
デフォルトスレッドプールプロファイルの設定
デフォルトのスレッドプールは、スレッドファクトリーがデフォルトスレッドプールプロファイル から設定を取得することによって自動的に作成されます。デフォルトのスレッドプールプロファイルには、表2.9「デフォルトスレッドプールプロファイルの設定」 に記載されている設定があります(これらの設定はアプリケーションコードによって変更されていないことを前提とします)。
スレッドオプション | デフォルト値 |
---|---|
|
|
|
|
|
|
|
|
|
|
デフォルトスレッドプールプロファイルの変更
デフォルトスレッドプールプロファイルの設定を変更することで、後続のすべてのデフォルトスレッドプールをカスタムの設定で作成することができます。プロファイルは Java または Spring XML のどちらでも変更できます。
たとえば、Java DSL では、以下のようにデフォルトスレッドプールプロファイルの poolSize
オプションと maxQueueSize
オプションをカスタマイズできます。
// Java import org.apache.camel.spi.ExecutorServiceManager; import org.apache.camel.spi.ThreadPoolProfile; ... ExecutorServiceManager manager = context.getExecutorServiceManager(); ThreadPoolProfile defaultProfile = manager.getDefaultThreadPoolProfile(); // Now, customize the profile settings. defaultProfile.setPoolSize(3); defaultProfile.setMaxQueueSize(100); ...
XML DSL では、以下のようにデフォルトスレッドプールプロファイルをカスタマイズできます。
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<threadPoolProfile
id="changedProfile"
defaultProfile="true"
poolSize="3"
maxQueueSize="100"/>
...
</camelContext>
前述の XML DSL の例では defaultProfile
属性を true
に設定することが不可欠です。そうしないと、そのスレッドプールプロファイルはデフォルトスレッドプールプロファイルを置き換えるのではなく、カスタムのスレッドプールプロファイルとして扱われてしまいます (「カスタムスレッドプールプロファイルの作成」 を参照)。
プロセッサーのスレッドプールのカスタマイズ
executorService
または executorServiceRef
オプションのいずれかを使用 (parallelProcessing
オプションの代わりにこれらのオプションを使用) することで、スレッド対応のプロセッサーにスレッドプールを直接指定することもできます。以下のように、プロセッサーのスレッドプールをカスタマイズするには 2 つの方法があります。
-
カスタムスレッドプールの値の指定 -
ExecutorService
(スレッドプール) インスタンスを明示的に作成し、これをexecutorService
オプションに渡します。 -
カスタムスレッドプールプロファイルの指定 - カスタムスレッドプールファクトリーを作成して登録します。
executorServiceRef
オプションを使用してこのファクトリーを参照すると、プロセッサーは自動的にそのファクトリーを使用して、カスタムスレッドプールインスタンスを作成します。
Bean ID を executorServiceRef
オプションに渡すと、スレッド対応のプロセッサーはまずその ID を持つカスタムスレッドプールをレジストリーの中から検索します。その ID でスレッドプールが登録されていない場合、プロセッサーはレジストリーの中からカスタムスレッドプールプロファイルを検索し、そのカスタムスレッドプールプロファイルを使用してカスタムスレッドプールをインスタンス化します。
カスタムスレッドプールの作成
カスタムスレッドプールは、java.util.concurrent.ExecutorService 型の任意のスレッドプールです。Apache Camel では、スレッドプールインスタンスを作成する上で以下の方法が推奨されています。
-
org.apache.camel.builder.ThreadPoolBuilder
ユーティリティーを使用して、スレッドプールクラスをビルドします。 -
現在の
CamelContext
からorg.apache.camel.spi.ExecutorServiceManager
インスタンスを使用して、スレッドプールクラスを作成します。
ThreadPoolBuilder
は実際には ExecutorServiceManager
インスタンスを使用して定義されているため、究極的には 2 つのアプローチには大きな違いはありません。通常、ThreadPoolBuilder
が推奨されます。これは、より単純なアプローチを提供するためです。ただし、ExecutorServiceManager
インスタンスディレクトリーにアクセスすることでのみ作成できる 1 種類のスレッド( ScheduledExecutorService
)があります。
表2.10「スレッドプールビルダーのオプション」 は、ThreadPoolBuilder
クラスがサポートするオプションを示します。これらのオプションは、新しいカスタムスレッドプールを定義する際に設定できます。
Builder オプション | 説明 |
---|---|
|
このスレッドプールが受信タスクキューに保持できる保留中の最大タスク数を設定します。 |
| プールの最小スレッド数を設定します (これは初期プールサイズにもなります)。デフォルト値はデフォルトスレッドプールプロファイルから取得されます。 |
| プールで使用できる最大スレッド数を設定します。デフォルト値はデフォルトスレッドプールプロファイルから取得されます。 |
| スレッドがこの期間 (秒単位で指定) よりも長い間アイドル状態になっている場合、スレッドを終了させる。これにより、負荷が軽くなるとスレッドプールが縮小されます。デフォルト値はデフォルトスレッドプールプロファイルから取得されます。 |
| 受信タスクキューが満杯の場合に実行すべきアクションを指定する。以下の 4 つの値から指定できます。
|
|
カスタムスレッドプールの構築を終了し、 |
Java DSL では、以下のように ThreadPoolBuilder
を使用してカスタムスレッドプールを定義できます。
// Java import org.apache.camel.builder.ThreadPoolBuilder; import java.util.concurrent.ExecutorService; ... ThreadPoolBuilder poolBuilder = new ThreadPoolBuilder(context); ExecutorService customPool = poolBuilder.poolSize(5).maxPoolSize(5).maxQueueSize(100).build("customPool"); ... from("direct:start") .multicast().executorService(customPool) .to("mock:first") .to("mock:second") .to("mock:third");
オブジェクト参照 customPool
を直接 executorService()
オプションに渡す代わりに、以下のように Bean ID を executorServiceRef()
オプションに渡すことで、レジストリーの中からスレッドプールを検索できます。
// Java from("direct:start") .multicast().executorServiceRef("customPool") .to("mock:first") .to("mock:second") .to("mock:third");
XML DSL では、threadPool
要素を使用して ThreadPoolBuilder
にアクセスします。その後、以下のように executorServiceRef
属性を使用して Spring レジストリーでスレッドプールを ID で検索することで、カスタムスレッドプールを参照できます。
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <threadPool id="customPool" poolSize="5" maxPoolSize="5" maxQueueSize="100" /> <route> <from uri="direct:start"/> <multicast executorServiceRef="customPool"> <to uri="mock:first"/> <to uri="mock:second"/> <to uri="mock:third"/> </multicast> </route> </camelContext>
カスタムスレッドプールプロファイルの作成
多くのカスタムスレッドプールインスタンスを作成する場合は、スレッドプールのファクトリーとして機能するカスタムスレッドプールプロファイルを定義しておくと便利です。スレッド対応プロセッサーからスレッドプールプロファイルを参照するだけで、プロセッサーは自動的にそのプロファイルを使用して新しいスレッドプールインスタンスを作成します。カスタムスレッドプールプロファイルは、Java DSL または XML DSL のどちらでも定義できます。
たとえば、Java DSL では、以下のようにして Bean ID customProfile
を持つカスタムスレッドプールプロファイルを作成し、ルート内でそのプロファイルを参照できます。
// Java import org.apache.camel.spi.ThreadPoolProfile; import org.apache.camel.impl.ThreadPoolProfileSupport; ... // Create the custom thread pool profile ThreadPoolProfile customProfile = new ThreadPoolProfileSupport("customProfile"); customProfile.setPoolSize(5); customProfile.setMaxPoolSize(5); customProfile.setMaxQueueSize(100); context.getExecutorServiceManager().registerThreadPoolProfile(customProfile); ... // Reference the custom thread pool profile in a route from("direct:start") .multicast().executorServiceRef("customProfile") .to("mock:first") .to("mock:second") .to("mock:third");
XML DSL では、threadPoolProfile
要素を使用してカスタムプールプロファイルを作成します (これはデフォルトスレッドプールプロファイル ではない ため、ここでは defaultProfile
オプションをデフォルトで false
に設定します)。以下のようにして、Bean ID customProfile
を持つカスタムスレッドプールプロファイルを作成し、ルート内でそのプロファイルを参照できます。
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <threadPoolProfile id="customProfile" poolSize="5" maxPoolSize="5" maxQueueSize="100" /> <route> <from uri="direct:start"/> <multicast executorServiceRef="customProfile"> <to uri="mock:first"/> <to uri="mock:second"/> <to uri="mock:third"/> </multicast> </route> </camelContext>
コンポーネント間でのスレッドプールの共有
File や FTP など、標準のポーリングベースのコンポーネントの中には、使用するスレッドプールを指定できるものがあります。これにより、異なるコンポーネントが同じスレッドプールを共有することが可能となり、JVM 内のスレッド総数を削減することができます。
たとえば、『 Apache Camel Component Reference Guide 』の「 File2 」および『 Apache Camel Component Reference Guide 』の「Ftp 2 」は、コンポーネントの ExecutorService
オブジェクトの指定に使用できる scheduledExecutorService
プロパティーを公開しています。
スレッド名のカスタマイズ
アプリケーションのログをより読みやすくするために、スレッド名 (ログ内でスレッドを識別するために使用されるもの) をカスタマイズすることが推奨されます。スレッド名をカスタマイズするには、ExecutorServiceStrategy
クラスまたは ExecutorServiceManager
クラスの setThreadNamePattern
メソッドを呼び出すことで、スレッド名パターン を設定します。または、スレッド名パターンを設定するより簡単な方法として、CamelContext
オブジェクトに threadNamePattern
プロパティーを設定する方法もあります。
スレッド名パターンでは、以下のプレースホルダーが使用できます。
#camelId#
-
現在の
CamelContext
の名前。 #counter#
- インクリメントカウンターとして実装された一意のスレッド ID。
#name#
- 通常の Camel スレッド名。
#longName#
- 長いスレッド名。エンドポイントパラメーターなどを含めることができる。
以下は、スレッド名パターンの典型的な例です。
Camel (#camelId#) thread #counter# - #name#
以下の例は、XML DSL を使用して Camel コンテキストに threadNamePattern
属性を設定する方法を示しています。
<camelContext xmlns="http://camel.apache.org/schema/spring"
threadNamePattern="Riding the thread #counter#" >
<route>
<from uri="seda:start"/>
<to uri="log:result"/>
<to uri="mock:result"/>
</route>
</camelContext>