30.4. Distributed Streams
Distributed Streams work similarly to map reduce; however, with Distributed Streams there are zero to many intermediate operations followed by a single terminal operation that is sent to each node where work is performed. The following steps are used for this behavior:
- The desired segments are grouped by which node is the primary owner of each given segment.
- A request is generated for each remote node. This request contains the intermediate and terminal operations, along with the segments to process.
- The thread where the terminal operation was initiated will perform the local operation directly.
- Each remote node will receive the generated request, run the operations on a remote thread, and then send the response back.
- Once all requests complete the user thread will gather all responses and perform any reductions specified by the operations.
- The final response is returned to the user.
30.4.1. Marshallability
When using distributed or replicated caches the keys and values must be marshallable; in addition, operations executed on Distributed Streams must also be marshallable, as these operations are sent to the other nodes in the cluster. This is most commonly accomplished by using a new class that is either
Serializable
or has an Externalizer
registered; however, as the FunctionalInterface
implements Serializable
all lambdas are instantly serialized and thus no additional cast is required.
Note
Intermediate values in distributed streams do not need to be marshallable; only the final value sent back, typically the terminal operation, must be marshallable.
If a lambda function is in use this may be serialized by casting the parameter as an instance of
Serializable
. For instance, consider a cache that stores Book entries; the following would create a collection of Book instances that match a specific author:
List<Book> books = cache.keySet().stream() .filter((Predicate<? super Book>) e -> e.getAuthor().equals("authorname")) .collect(toList());
Additionally, not all produced
Collectors
are marshallable by default. JBoss Data Grid has included org.infinispan.stream.CacheCollectors
as a convenient way to utilize any combination of Collectors
that function properly when marshalling is required.
30.4.2. Parallelism
There are two different methods to parallelize streams:
- Parallel Streams - causing each operation to be executed in parallel on a single node
- Parallel Distribution - parallelizing the request so that it involves multiple nodes
By default, Distributed Streams enable parallel distribution; however, this may be further coupled with a parallel
Stream
, allowing concurrent operations executing across multiple nodes, with multiple threads on each node.
To mark a
Stream
as parallel it may either be obtained with parallelStream()
, or it may be enabled after obtaining the Stream
by invoking parallel()
. The following example shows both methods:
// Obtain a parallel Stream initially List<Book> books = cache.keySet().parallelStream() [...] // Create the initial stream and then invoke parallel List<Book> books = cache.keySet().stream() .parallel() [...]
Note
Some operations, such as rehash aware iterator or forEach operations, have a sequential stream forced locally. Using parallel streams on these operations is not possible at this time.
30.4.3. Distributed Operators
30.4.3.1. Terminal Operator Distributed Result Reductions
Below each terminal operator is discussed, along with how the distributed reduction works for each one.
- allMatchThis operator is run on each node and then all results are combined using a logical AND operation locally to obtain the final value. If a normal stream operation returns early then these methods will complete early as well.
- noneMatch anyMatchThese operators are run on each node and then all results are combined using a logical OR operation locally to obtain the final value. If a normal stream operation returns early then these methods will complete early as well.
- collectThe collect method can perform a few extra steps. Similar to other methods the remote node will perform everything as expected; however, instead of performing the final finisher operator it sends back the fully combined results. The local thread will then combine all local and remote results into a value which then performs the finisher operator. In addition, the final value does not need to be serializable, but the values produced from the supplier and combiner methods must be serialized.
- countThe count method simply adds the numbers received from each node.
- findAny findFirstThe findAny method will return the first value found, regardless if it was from a remote or local node. This operation supports early termination, as once an initial value has been found no others will be processed. The findFirst method behaves similarly, but requires a sorted intermediate operation which is described in Section 30.4.3.3, “Intermediate Operation Exceptions”.
- max minThe max and min methods find the respective value on each node before a final reduction is performed locally to determine the true max or min across all nodes.
- reduceThe various reduce methods seralize the result as much as possible before accumulating the local and remote results together locally, combining if enabled. Due to this behavior a value returned from the combiner does not need to be serializable.
30.4.3.2. Key Based Rehash Aware Operators
Unlike the other terminal operators each of the following operators require a special type of rehash awareness to keep track of which keys per segment have been processed. This guarantees each key will be processed exactly once, for
iterator
and spliterator
operators, or at least once, for forEach
, even if cluster membership changes.
- iterator spliteratorThese operators return batches of entries when run on a remote node, where the next batch is only sent after the previous is fully consumed. This behavior is to limit how many entries are retained in memory at any given time. The user node will keep track of which keys have been processed, and once a segment has completed those keys will be released from memory. Because of this behavior it is preferable to use sequential processing, allowing only a subset of segment keys to be held in memory instead of having keys from all nodes retained.
- forEachWhile
forEach
returns batches it only returns a batch after it has finished processing at least a batch worth of keys. This way the originating node knows which keys have been processed already, which reduces the possibility of processing the same entry again; however, it is possible to have the same set processed repeatedly if a node goes down unexpectedly. In this case the node could have been processing an uncompleted batch when it went down, resulting in the same batch to be ran again when the rehash failure operation occurs. Adding a node will not cause this issue, as the rehash failover does not occur until all responses are received.
The operations' batch sizes are controlled by the same value,
distributedBatchSize
, on the CacheStream
. If no value is set then it will default to the chunkSize
configured in state transfer. While larger values will allow for larger batches, resulting in fewer returns, this results in increased memory usage, and testing should be performed to determine an appropriate size for each application.
30.4.3.3. Intermediate Operation Exceptions
The following intermediate operations have special exceptions. All of these methods have some sort of artificial iterator implanted in the stream processing to guarantee correctness, and due to this using any of the following may cause severe performance degradation.
- SkipAn artificial iterator is implanted up to the skip operation, and then results are brought locally so that the appropriate number of elements may be skipped.
- PeekAn artificial iterator is implanted up to the peek operation. Only up to a number of peeked elements are returned to a remote node, and then results are brought locally so that it may peek at only the desired amount.
- SortedAn artificial iterator is implanted up to the sorted operation, and then all results are locally sorted.
Warning
This operation requires having all entries in memory on the local node. - DistinctDistinct is performed on each remote node and then an artificial iterator returns those distinct values, before all of those results have a distinct operation performed upon them.
Warning
This operation requires having all entries in memory on the local node.
30.4.4. Distributed Stream Examples
A classic example of Map/Reduce is word count. Assuming we have a cache with
String
for keys and values, and we need to count the occurrence of all words in all sentences, this could be implemented using the following:
Map<String, Integer> wordCountMap = cache.entrySet().parallelStream() .map((Serializable & Function<Map.Entry<String, String>, String[]>) e -> e.getValue().split("\\s")) .flatMap((Function<String[], Stream<String>>) Arrays::stream) .collect(CacheCollectors.serializableCollector(() -> Collectors.groupingBy(Function.identity(), Collectors.counting())));
If we wanted to revise the example to find the most frequent word we would need to have all words available and counted locally first. The following snippet extends our previous example to perform this search:
String mostFrequentWord = cache.entrySet().parallelStream() .map((Serializable & Function<Map.Entry<String, String>, String[]>) e -> e.getValue().split("\\s")) .flatMap((Function<String[], Stream<String>>) Arrays::stream) .collect(CacheCollectors.serializableCollector(() -> Collectors.collectingAndThen( Collectors.groupingBy(Function.identity(), Collectors.counting()), wordCountMap -> { String mostFrequent = null; long maxCount = 0; for (Map.Entry<String, Long> e : wordCountMap.entrySet()) { int count = e.getValue().intValue(); if (count > maxCount) { maxCount = count; mostFrequent = e.getKey(); } } return mostFrequent; })));
At present, this last step will be executed in a single thread. We can further optimize this operation by using a parallel stream locally to perform the final operation:
Map<String, Long> wordCount = cache.entrySet().parallelStream() .map((Function<Map.Entry<String, String>, String[]>) e -> e.getValue().split("\\s")) .flatMap((Function<String[], Stream<String>>) Arrays::stream) .collect(CacheCollectors.serializableCollector(() -> Collectors.groupingBy(Function.identity(), Collectors.counting()))); Optional<Map.Entry<String, Long>> mostFrequent = wordCount.entrySet().parallelStream() .reduce((e1, e2) -> e1.getValue() > e2.getValue() ? e1 : e2);