13.6. 配布/複製/散在
これは、ストリームがストライドになるところです。ストリーム操作が実行されると、関連データを持つ各ノードにさまざまな中間操作と端末操作が送信されます。これにより、データを所有するノードで中間値を処理し、最終結果を元のノードにのみ送信し、パフォーマンスが向上します。
13.6.1. 再ハッシュ対応
内部的にはデータがセグメント化され、各ノードはプライマリー所有者として所有するデータでのみ操作を実行します。これにより、セグメントが各ノードで等量のデータを提供するのに十分な粒度であると仮定して、データを均等に処理できます。
分散キャッシュを使用する場合には、新規ノードが加わったり、残ったりすると、データをノード間で再シャッフルすることができます。分散ストリームはこのデータの再シャッフルを自動的に処理するため、ノードがクラスターを離れたり、クラスターに参加したりするときの監視について心配する必要はありません。シャッフルされたエントリーは 2 回処理される可能性があり、重複処理の量を制限するために、キーレベルまたはセグメントレベル (端末操作に応じて) で処理されたエントリーを追跡します。
ストリームで再ハッシュ認識を無効にすることは可能ですが、推奨されません。これは、再ハッシュが発生したときに、リクエストがデータのサブセットの確認を処理できる場合に限り考慮する必要があります。これは、CacheStream.disableRehashAware() を呼び出すことで実行できます。再ハッシュが発生しない場合、ほとんどの操作のパフォーマンスの向上は、完全に無視できます。唯一の例外は、処理されたキーを追跡する必要がないため、使用するメモリーが少ない iterator と forEach です。
自分が何をしているかを本当に理解していない限り、再ハッシュ認識を無効にすることを再考してください。
13.6.2. シリアル化
操作は他のノード全体に送信されるため、Data Grid マーシャリングでシリアライズできる必要があります。これにより、他のノードに操作を送信できます。
最も簡単な方法は、CacheStream インスタンスを使用し、通常どおりラムダを使用することです。Data Grid は、さまざまな Stream 中間メソッドおよび端末メソッドをすべて上書きして、引数の Serializable バージョン(SerializableFunction、SerializablePredicate など)を取得します。)でこれらのメソッドを CacheStream で見つけることができます。これは、ここで定義されている最も具体的な方法を選択するための仕様に依存しています。
上記の例では、Collector
を使用してすべての結果を Map
に収集しました。ただし、Collector クラスは Serializable インスタンスを生成しません。そのため、これらを使用する必要がある場合は、2 つの方法があります。
1 つのオプションとして、Supplier<Collector>
の指定を可能にする CacheCollectors クラスを使用します。このインスタンスは、シリアライズされていない Collector
を提供するために、Collectors を使用することができます。
Map<Object, String> jbossValues = cache.entrySet().stream() .filter(e -> e.getValue().contains("Jboss")) .collect(CacheCollectors.serializableCollector(() -> Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
または、CacheCollectors の使用を回避し、代わりに Supplier<Collector>
を取得するオーバーロードされた collect
メソッドを使用できます。オーバーロードされた collect
メソッドは CacheStream
インターフェイスでしか利用できません。
Map<Object, String> jbossValues = cache.entrySet().stream() .filter(e -> e.getValue().contains("Jboss")) .collect(() -> Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
ただし、Cache
および CacheStream
インターフェイスを使用できない場合は、Serializable
引数を使用できないため、ラムダを複数インターフェイスをキャストすることで、ラムダを Serializable
に手動でキャストする必要があります。優れた方法ではありませんが、設定することは可能です。
Map<Object, String> jbossValues = map.entrySet().stream() .filter((Serializable & Predicate<Map.Entry<Object, String>>) e -> e.getValue().contains("Jboss")) .collect(CacheCollectors.serializableCollector(() -> Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
推奨され最も高性能な方法は、最小限のペイロードを提供するために、AdvancedExternalizer
を使用することです。残念ながら、これは、高度なエクスターナライザーが事前にクラスを定義する必要があるため、ラムダを使用できないことを意味します。
以下に示すように、高度なエクスターナライザーを使用できます。
Map<Object, String> jbossValues = cache.entrySet().stream() .filter(new ContainsFilter("Jboss")) .collect(() -> Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); class ContainsFilter implements Predicate<Map.Entry<Object, String>> { private final String target; ContainsFilter(String target) { this.target = target; } @Override public boolean test(Map.Entry<Object, String> e) { return e.getValue().contains(target); } } class JbossFilterExternalizer implements AdvancedExternalizer<ContainsFilter> { @Override public Set<Class<? extends ContainsFilter>> getTypeClasses() { return Util.asSet(ContainsFilter.class); } @Override public Integer getId() { return CUSTOM_ID; } @Override public void writeObject(ObjectOutput output, ContainsFilter object) throws IOException { output.writeUTF(object.target); } @Override public ContainsFilter readObject(ObjectInput input) throws IOException, ClassNotFoundException { return new ContainsFilter(input.readUTF()); } }
コレクターサプライヤーに高度なエクスターナライザーを使用して、ペイロードサイズをさらに減らすこともできます。
Map<Object, String> map = (Map<Object, String>) cache.entrySet().stream() .filter(new ContainsFilter("Jboss")) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); class ToMapCollectorSupplier<K, U> implements Supplier<Collector<Map.Entry<K, U>, ?, Map<K, U>>> { static final ToMapCollectorSupplier INSTANCE = new ToMapCollectorSupplier(); private ToMapCollectorSupplier() { } @Override public Collector<Map.Entry<K, U>, ?, Map<K, U>> get() { return Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue); } } class ToMapCollectorSupplierExternalizer implements AdvancedExternalizer<ToMapCollectorSupplier> { @Override public Set<Class<? extends ToMapCollectorSupplier>> getTypeClasses() { return Util.asSet(ToMapCollectorSupplier.class); } @Override public Integer getId() { return CUSTOM_ID; } @Override public void writeObject(ObjectOutput output, ToMapCollectorSupplier object) throws IOException { } @Override public ToMapCollectorSupplier readObject(ObjectInput input) throws IOException, ClassNotFoundException { return ToMapCollectorSupplier.INSTANCE; } }