第34章 分散実行
34.1. 分散実行
Red Hat JBoss Data Grid は、標準の JDK ExecutorService
インターフェースより分散実行を提供します。実行のために提出されたタスクは、ローカル JVM ではなく、JBoss Data Grid ノードのクラスター全体で実行されます。
JBoss Data Grid の分散タスクエクゼキューターは、JBoss Data Grid キャッシュノードからのデータを実行タスクの入力として使用できます。そのため、中間または最終結果のキャッシュストアを設定する必要がありません。JBoss Data Grid の入力データはすでに負荷分散されているため、タスクも自動的に分散されます。そのため、明示的にタスクを特定のノードに割り当てる必要はありません。
JBoss Data Grid の分散実行フレームワークでは、以下が行われます。
-
各
DistributedExecutorService
は単一のキャッシュにバインドされます。提出されたタスクがDistributedCallable
のインスタンスである場合、そのタスクは特定のキャッシュからキーバリューペアにアクセスできます。 -
タスクの 1 つが実行されるときにタスクが他のノードに移行されないようにするため、提出された各
Callable
、Runnable
、DistributedCallable
は、Serializable
またはExternalizable
である必要があります。Callable
から返された値もSerializable
またはExternalizable
である必要があります。
34.2. 分散エクゼキューターサービス
DistributedExecutorService
は DistributedCallable
と、クラスターの他の Callable
および Runnable
クラスの実行を制御します。これらのインスタンスは、インストール時に渡される特定キャッシュに関係します。
DistributedExecutorService des = new DefaultExecutorService(cache);
DistributedCallableAPI の説明にあるとおり、DistributedCallable
が拡張されると、DistributedTask
はキーのサブセットに対してのみ実行できます。タスクがこのように単一ノードへ提出されると、JBoss Data Gridは指定のキーが含まれるノードを見つけます。さらに、DistributedCallable
をこのノードに移行し、CompletableFuture
を返します。タスクがこのように利用可能なノードすべてに提出された場合、指定のキーが含まれるノードのみがタスクを受け取ります。
DistributedTask
が作成されたら、以下のメソッドの 1 つを使ってクラスターに提出することができます。
submitEverywhere
メソッドを使用すると、クラスターの利用可能なノードすべてとキーバリューペアすべてにタスクを提出できます。des.submitEverywhere(task)
submitEverywhere
メソッドは、キーのセットを引数として取ることもできます。このようにキーを渡すと、指定のキーが含まれる利用可能なノードのみにタスクが提出されます。des.submitEverywhere(task, $KEY)
キーが指定された場合、タスクは指定されたキーが 1 つ以上含まれる単一のノードで実行されます。ローカルにないキーは、クラスターから取得されます。このバージョンの
submit
メソッドは、以下の例のように操作する 1 つ以上のキーを受け入れます。des.submit(task, $KEY) des.submit(task, $KEY1, $KEY2, $KEY3)
ノードの
Address
をsubmit
メソッドに渡すと、特定ノードの実行を指示できます。以下は、クラスターのCoordinator
でのみ実行されます。des.submit(cache.getCacheManager().getCoordinator(), task)
注記デフォルトではタスクは自動的に分散されるため、通常は、実行の対象となる特定のノードを指定する必要はありません。
34.3. DistributedCallable API
DistributedCallable
インターフェースは、java.util.concurrent.package からの既存の Callable
のサブタイプです。リモート JVM で実行でき、Red Hat JBoss Data Grid からの入力を受け取ることができます。DistributedCallable
インターフェースは、JBoss Data Grid のキャッシュデータにアクセスする必要があるタスクのために使用されます。
DistributedCallable
API を使用してタスクを実行する場合、タスクのメインアルゴリズムに変更はありませんが、入力ソースは変更になります。
キャッシュへのアクセスや渡したキーセットへのアクセスが必要な場合、Callable
インターフェースをすでに実装済みのユーザーは DistributedCallable
を拡張する必要があります。
DistributedCallable
API の使用
public interface DistributedCallable<K, V, T> extends Callable<T> { /** * Invoked by execution environment after DistributedCallable * has been migrated for execution to a specific Infinispan node. * * @param cache * cache whose keys are used as input data for this * DistributedCallable task * @param inputKeys * keys used as input for this DistributedCallable task */ public void setEnvironment(Cache<K, V> cache, Set<K> inputKeys); }
34.4. Callable および CDI
DistributedCallable
を実装できない場合または DistributedCallable
が適切でない場合に DistributedExecutorService
で使用される入力キャッシュへの参照が必要であれば、CDI メカニズムによって入力キャッシュをインジェクトするオプションを使用できます。
Callable
タスクが Red Hat JBoss Data Grid の実行ノードに到達すると、JBoss Data Grid の CDI メカニズムは適切なキャッシュ参照を提供し、実行している Callable
にインジェクトします。
Callable
を用いて JBoss Data Grid CDI を使用するには、以下の手順に従います。
-
Callable
でCache
フィールドを宣言し、org.infinispan.cdi.Input
でアノテーションを付けます。 -
必須の
@Inject
アノテーションを含めます。
Callable
および CDI の使用
public class CallableWithInjectedCache implements Callable<Integer>, Serializable { @Inject @Input private Cache<String, String> cache; @Override public Integer call() throws Exception { //use injected cache reference return 1; } }
34.5. 分散タスクのフェイルオーバー
Red Hat JBoss Data Grid の分散実行フレームワークは、以下の場合でタスクのフェイルオーバーをサポートします。
- タスクが実行されている場所でノードの障害によるフェイルオーバーが発生した場合。
-
タスクの障害によるフェイルオーバー。たとえば、
Callable
タスクによって例外が発生した場合。
フェイルオーバーポリシーはデフォルトで無効になっており、Runnable
、Callable
、および DistributedCallable
タスクはフェイルオーバーメカニズムを呼び出しせずに失敗します。
JBoss Data Grid は、ランダムノードフェイルオーバーポリシーを提供し、利用可能な場合に別のランダムなノードで Distributed
タスクの一部を実行します。
たとえば、以下を使用してランダムフェイルオーバー実行ポリシーを指定できます。
ランダムフェイルオーバー実行ポリシー
DistributedExecutorService des = new DefaultExecutorService(cache); DistributedTaskBuilder<Boolean> taskBuilder = des.createDistributedTaskBuilder(new SomeCallable()); taskBuilder.failoverPolicy(DefaultExecutorService.RANDOM_NODE_FAILOVER); DistributedTask<Boolean> distributedTask = taskBuilder.build(); Future<Boolean> future = des.submit(distributedTask); Boolean r = future.get();
DistributedTaskFailoverPolicy
インターフェースを実行してフェイルオーバー管理を提供することもできます。
分散タスクのフェイルオーバーポリシーインターフェース
/** * DistributedTaskFailoverPolicy allows pluggable fail over target selection for a failed remotely * executed distributed task. * */ public interface DistributedTaskFailoverPolicy { /** * As parts of distributively executed task can fail due to the task itself throwing an exception * or it can be an Infinispan system caused failure (e.g node failed or left cluster during task * execution etc). * * @param failoverContext * the FailoverContext of the failed execution * @return result the Address of the Infinispan node selected for fail over execution */ Address failover(FailoverContext context); /** * Maximum number of fail over attempts permitted by this DistributedTaskFailoverPolicy * * @return max number of fail over attempts */ int maxFailoverAttempts(); }
34.6. 分散タスク実行ポリシー
DistributedTaskExecutionPolicy
は、ノードのサブセットへのタスクの実行をスコープ指定することで、タスクが Red Hat JBoss Data Grid クラスター全体でカスタム実行ポリシーを指定できるようにします。
たとえば、DistributedTaskExecutionPolicy
を使用すると、以下の場合でタスクの実行を管理できます。
- バックアップリモートネットワークセンターではなく、ローカルネットワークの場所でのみタスクが実行される場合。
- タスクの実行に特定の JBoss Data Grid ラックノードの専用サブセットのみが必要な場合。
ラックノードを使用した特定タスクの実行
DistributedExecutorService des = new DefaultExecutorService(cache); DistributedTaskBuilder<Boolean> taskBuilder = des.createDistributedTaskBuilder(new SomeCallable()); taskBuilder.executionPolicy(DistributedTaskExecutionPolicy.SAME_RACK); DistributedTask<Boolean> distributedTask = taskBuilder.build(); Future<Boolean> future = des.submit(distributedTask); Boolean r = future.get();
34.7. 分散実行とローカリティー
分散環境の所有権では、DistributionManager
と ConsistentHash
は理論的で、これらのクラスはデータがキャッシュでアクティブであるかを認識しません。代わりに、これらのクラスは指定のキーを格納するノードを判断するために使用されます。
指定のキーのローカリティーを確認するには、以下のオプションの 1 つを使用します。
オプション 1: 以下の例のように、キーがキャッシュにあり、そのキーがローカルであることを
DistributionManager
が示すことを確認します。(cache.getAdvancedCache().withFlags(SKIP_REMOTE_LOOKUP).containsKey(key) && cache.getAdvancedCache().getDistributionManager().getLocality(key).isLocal())
オプション 2: 直接
DataContainer
をクエリーします。cache.getAdvancedCache().getDataContainer().containsKey(key)
エントリーがパッシベートされた場合、キーの存在に関わらず DataContainer
は False
を返します。
34.7.1. 分散実行の例
この例では、Pi () の近似値の算出に並列分散実行が使用されます。
- 以下に示された正方形の面積:
- 円の面積の方程式:
- 最初の式から r を分離:
- r のこの値を 2 番目の式に挿入し、Pi の値を算出:
- 式の結果を分離:
図34.1 分散実行の例
正方形に大量のダーツを投げ、その正方形の中に円を描き、円の内部に突き刺さったすべてのダーツを破棄すると、円/正方形の近似値を算出できます。
以前、Pi の値は 4C/S と算出されました。この値を使用して Pi の近似値を求めることができます。投げるダーツの数を最大限にすると近似値がより正確になります。
以下の例では、クラスター全体で並列処理して 1 千万本のダーツを投げます。
分散実行の例
public class PiAppx { public static void main (String [] arg){ List<Cache> caches = ...; Cache cache = ...; int numPoints = 10000000; int numServers = caches.size(); int numberPerWorker = numPoints / numServers; DistributedExecutorService des = new DefaultExecutorService(cache); long start = System.currentTimeMillis(); CircleTest ct = new CircleTest(numberPerWorker); List<Future<Integer>> results = des.submitEverywhere(ct); int countCircle = 0; for (Future<Integer> f : results) { countCircle += f.get(); } double appxPi = 4.0 * countCircle / numPoints; System.out.println("Distributed PI appx is " + appxPi + " completed in " + (System.currentTimeMillis() - start) + " ms"); } private static class CircleTest implements Callable<Integer>, Serializable { /** The serialVersionUID */ private static final long serialVersionUID = 3496135215525904755L; private final int loopCount; public CircleTest(int loopCount) { this.loopCount = loopCount; } @Override public Integer call() throws Exception { int insideCircleCount = 0; for (int i = 0; i < loopCount; i++) { double x = Math.random(); double y = Math.random(); if (insideCircle(x, y)) insideCircleCount++; } return insideCircleCount; } private boolean insideCircle(double x, double y) { return (Math.pow(x - 0.5, 2) + Math.pow(y - 0.5, 2)) <= Math.pow(0.5, 2); } } }