Chapter 10. Executing code in the Grid
The main benefit of a Cache is the ability to very quickly lookup a value by its key, even across machines. In fact this use alone is probably the reason many users use Red Hat Data Grid. However Red Hat Data Grid can provide many more benefits that aren’t immediately apparent. Since Red Hat Data Grid is usually used in a cluster of machines we also have features available that can help utilize the entire cluster for performing the user’s desired workload.
This section covers only executing code in the grid using an embedded cache, if you are using a remote cache you should check out Executing code in the Remote Grid.
10.1. Cluster Executor
Since you have a group of machines, it makes sense to leverage their combined computing power for executing code on all of them them. The cache manager comes with a nice utility that allows you to execute arbitrary code in the cluster. Note this feature requires no Cache to be used. This Cluster Executor can be retrieved by calling executor() on the EmbeddedCacheManager
. This executor is retrievable in both clustered and non clustered configurations.
The ClusterExecutor is specifically designed for executing code where the code is not reliant upon the data in a cache and is used instead as a way to help users to execute code easily in the cluster.
This manager was built specifically using Java 8 and such has functional APIs in mind, thus all methods take a functional inteface as an argument. Also since these arguments will be sent to other nodes they need to be serializable. We even used a nice trick to ensure our lambdas are immediately Serializable. That is by having the arguments implement both Serializable and the real argument type (ie. Runnable or Function). The JRE will pick the most specific class when determining which method to invoke, so in that case your lambdas will always be serializable. It is also possible to use an Externalizer to possibly reduce message size further.
The manager by default will submit a given command to all nodes in the cluster including the node where it was submitted from. You can control on which nodes the task is executed on by using the filterTargets
methods as is explained in the section.
10.1.1. Filtering execution nodes
It is possible to limit on which nodes the command will be ran. For example you may want to only run a computation on machines in the same rack. Or you may want to perform an operation once in the local site and again on a different site. A cluster executor can limit what nodes it sends requests to at the scope of same or different machine, rack or site level.
SameRack.java
EmbeddedCacheManager manager = ...; manager.executor().filterTargets(ClusterExecutionPolicy.SAME_RACK).submit(...)
To use this topology base filtering you must enable topology aware consistent hashing through Server Hinting.
You can also filter using a predicate based on the Address
of the node. This can also be optionally combined with topology based filtering in the previous code snippet.
We also allow the target node to be chosen by any means using a Predicate
that will filter out which nodes can be considered for execution. Note this can also be combined with Topology filtering at the same time to allow even more fine control of where you code is executed within the cluster.
Predicate.java
EmbeddedCacheManager manager = ...; // Just filter manager.executor().filterTargets(a -> a.equals(..)).submit(...) // Filter only those in the desired topology manager.executor().filterTargets(ClusterExecutionPolicy.SAME_SITE, a -> a.equals(..)).submit(...)
10.1.2. Timeout
Cluster Executor allows for a timeout to be set per invocation. This defaults to the distributed sync timeout as configured on the Transport Configuration. This timeout works in both a clustered and non clustered cache manager. The executor may or may not interrupt the threads executing a task when the timeout expires. However when the timeout occurs any Consumer
or Future
will be completed passing back a TimeoutException
. This value can be overridden by ivoking the timeout method and supplying the desired duration.
10.1.3. Single Node Submission
Cluster Executor can also run in single node submission mode instead of submitting the command to all nodes it will instead pick one of the nodes that would have normally received the command and instead submit it it to only one. Each submission will possibly use a different node to execute the task on. This can be very useful to use the ClusterExecutor as a java.util.concurrent.Executor
which you may have noticed that ClusterExecutor implements.
SingleNode.java
EmbeddedCacheManager manager = ...; manager.executor().singleNodeSubmission().submit(...)
10.1.3.1. Failover
When running in single node submission it may be desirable to also allow the Cluster Executor handle cases where an exception occurred during the processing of a given command by retrying the command again. When this occurs the Cluster Executor will choose a single node again to resubmit the command to up to the desired number of failover attempts. Note the chosen node could be any node that passes the topology or predicate check. Failover is enabled by invoking the overridden singleNodeSubmission method. The given command will be resubmitted again to a single node until either the command completes without exception or the total submission amount is equal to the provided failover count.
10.1.4. Example: PI Approximation
This example shows how you can use the ClusterExecutor to estimate the value of PI.
Pi approximation can greatly benefit from parallel distributed execution via Cluster Executor. Recall that area of the square is Sa = 4r2 and area of the circle is Ca=pi*r2. Substituting r2 from the second equation into the first one it turns out that pi = 4 * Ca/Sa. Now, image that we can shoot very large number of darts into a square; if we take ratio of darts that land inside a circle over a total number of darts shot we will approximate Ca/Sa value. Since we know that pi = 4 * Ca/Sa we can easily derive approximate value of pi. The more darts we shoot the better approximation we get. In the example below we shoot 1 billion darts but instead of "shooting" them serially we parallelize work of dart shooting across the entire Red Hat Data Grid cluster. Note this will work in a cluster of 1 was well, but will be slower.
public class PiAppx { public static void main (String [] arg){ EmbeddedCacheManager cacheManager = .. boolean isCluster = .. int numPoints = 1_000_000_000; int numServers = isCluster ? cacheManager.getMembers().size() : 1; int numberPerWorker = numPoints / numServers; ClusterExecutor clusterExecutor = cacheManager.executor(); long start = System.currentTimeMillis(); // We receive results concurrently - need to handle that AtomicLong countCircle = new AtomicLong(); CompletableFuture<Void> fut = clusterExecutor.submitConsumer(m -> { int insideCircleCount = 0; for (int i = 0; i < numberPerWorker; i++) { double x = Math.random(); double y = Math.random(); if (insideCircle(x, y)) insideCircleCount++; } return insideCircleCount; }, (address, count, throwable) -> { if (throwable != null) { throwable.printStackTrace(); System.out.println("Address: " + address + " encountered an error: " + throwable); } else { countCircle.getAndAdd(count); } }); fut.whenComplete((v, t) -> { // This is invoked after all nodes have responded with a value or exception if (t != null) { t.printStackTrace(); System.out.println("Exception encountered while waiting:" + t); } else { double appxPi = 4.0 * countCircle.get() / numPoints; System.out.println("Distributed PI appx is " + appxPi + " using " + numServers + " node(s), completed in " + (System.currentTimeMillis() - start) + " ms"); } }); // May have to sleep here to keep alive if no user threads left } private static boolean insideCircle(double x, double y) { return (Math.pow(x - 0.5, 2) + Math.pow(y - 0.5, 2)) <= Math.pow(0.5, 2); } }
10.2. Streams
You may want to process a subset or all data in the cache to produce a result. This may bring thoughts of Map Reduce. Red Hat Data Grid allows the user to do something very similar but utilizes the standard JRE APIs to do so. Java 8 introduced the concept of a Stream which allows functional-style operations on collections rather than having to procedurally iterate over the data yourself. Stream operations can be implemented in a fashion very similar to MapReduce. Streams, just like MapReduce allow you to perform processing upon the entirety of your cache, possibly a very large data set, but in an efficient way.
Streams are the preferred method when dealing with data that exists in the cache because streams automatically adjust to cluster topology changes.
Also since we can control how the entries are iterated upon we can more efficiently perform the operations in a cache that is distributed if you want it to perform all of the operations across the cluster concurrently.
A stream is retrieved from the entrySet, keySet or values collections returned from the Cache by invoking the stream or parallelStream methods.
10.2.1. Common stream operations
This section highlights various options that are present irrespective of what type of underlying cache you are using.
10.2.2. Key filtering
It is possible to filter the stream so that it only operates upon a given subset of keys. This can be done by invoking the filterKeys method on the CacheStream
. This should always be used over a Predicate filter and will be faster if the predicate was holding all keys.
If you are familiar with the AdvancedCache
interface you may be wondering why you even use getAll over this keyFilter. There are some small benefits (mostly smaller payloads) to using getAll if you need the entries as is and need them all in memory in the local node. However if you need to do processing on these elements a stream is recommended since you will get both distributed and threaded parallelism for free.
10.2.3. Segment based filtering
This is an advanced feature and should only be used with deep knowledge of Red Hat Data Grid segment and hashing techniques. These segments based filtering can be useful if you need to segment data into separate invocations. This can be useful when integrating with other tools such as Apache Spark.
This option is only supported for replicated and distributed caches. This allows the user to operate upon a subset of data at a time as determined by the KeyPartitioner. The segments can be filtered by invoking filterKeySegments method on the CacheStream
. This is applied after the key filter but before any intermediate operations are performed.
10.2.4. Local/Invalidation
A stream used with a local or invalidation cache can be used just the same way you would use a stream on a regular collection. Red Hat Data Grid handles all of the translations if necessary behind the scenes and works with all of the more interesting options (ie. storeAsBinary, compatibility mode, and a cache loader). Only data local to the node where the stream operation is performed will be used, for example invalidation only uses local entries.
10.2.5. Example
The code below takes a cache and returns a map with all the cache entries whose values contain the string "JBoss"
Map<Object, String> jbossValues = cache.entrySet().stream() .filter(e -> e.getValue().contains("JBoss")) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
10.3. Distribution/Replication/Scattered
This is where streams come into their stride. When a stream operation is performed it will send the various intermediate and terminal operations to each node that has pertinent data. This allows processing the intermediate values on the nodes owning the data, and only sending the final results back to the originating nodes, improving performance.
10.3.1. Rehash Aware
Internally the data is segmented and each node only performs the operations upon the data it owns as a primary owner. This allows for data to be processed evenly, assuming segments are granular enough to provide for equal amounts of data on each node.
When you are utilizing a distributed cache, the data can be reshuffled between nodes when a new node joins or leaves. Distributed Streams handle this reshuffling of data automatically so you don’t have to worry about monitoring when nodes leave or join the cluster. Reshuffled entries may be processed a second time, and we keep track of the processed entries at the key level or at the segment level (depending on the terminal operation) to limit the amount of duplicate processing.
It is possible but highly discouraged to disable rehash awareness on the stream. This should only be considered if your request can handle only seeing a subset of data if a rehash occurs. This can be done by invoking CacheStream.disableRehashAware() The performance gain for most operations when a rehash doesn’t occur is completely negligible. The only exceptions are for iterator and forEach, which will use less memory, since they do not have to keep track of processed keys.
Please rethink disabling rehash awareness unless you really know what you are doing.
10.3.2. Serialization
Since the operations are sent across to other nodes they must be serializable by Red Hat Data Grid marshalling. This allows the operations to be sent to the other nodes.
The simplest way is to use a CacheStream instance and use a lambda just as you would normally. Red Hat Data Grid overrides all of the various Stream intermediate and terminal methods to take Serializable versions of the arguments (ie. SerializableFunction, SerializablePredicate…) You can find these methods at CacheStream. This relies on the spec to pick the most specific method as defined here.
In our previous example we used a Collector
to collect all the results into a Map
. Unfortunately the Collectors class doesn’t produce Serializable instances. Thus if you need to use these, there are two ways to do so:
One option would be to use the CacheCollectors class which allows for a Supplier<Collector>
to be provided. This instance could then use the Collectors to supply a Collector
which is not serialized. You can read more details about how the collector peforms in a distributed fashion at distributed execution.
Map<Object, String> jbossValues = cache.entrySet().stream() .filter(e -> e.getValue().contains("Jboss")) .collect(CacheCollectors.serializableCollector(() -> Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
Alternatively, you can avoid the use of CacheCollectors and instead use the overloaded collect
methods that take Supplier<Collector>
. These overloaded collect
methods are only available via CacheStream
interface.
Map<Object, String> jbossValues = cache.entrySet().stream() .filter(e -> e.getValue().contains("Jboss")) .collect(() -> Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
If however you are not able to use the Cache
and CacheStream
interfaces you cannot utilize Serializable
arguments and you must instead cast the lambdas to be Serializable
manually by casting the lambda to multiple interfaces. It is not a pretty sight but it gets the job done.
Map<Object, String> jbossValues = map.entrySet().stream() .filter((Serializable & Predicate<Map.Entry<Object, String>>) e -> e.getValue().contains("Jboss")) .collect(CacheCollectors.serializableCollector(() -> Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
The recommended and most performant way is to use an AdvancedExternalizer as this provides the smallest payload. Unfortunately this means you cannot use lamdbas as advanced externalizers require defining the class before hand.
You can use an advanced externalizer as shown below:
Map<Object, String> jbossValues = cache.entrySet().stream() .filter(new ContainsFilter("Jboss")) .collect(() -> Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); class ContainsFilter implements Predicate<Map.Entry<Object, String>> { private final String target; ContainsFilter(String target) { this.target = target; } @Override public boolean test(Map.Entry<Object, String> e) { return e.getValue().contains(target); } } class JbossFilterExternalizer implements AdvancedExternalizer<ContainsFilter> { @Override public Set<Class<? extends ContainsFilter>> getTypeClasses() { return Util.asSet(ContainsFilter.class); } @Override public Integer getId() { return CUSTOM_ID; } @Override public void writeObject(ObjectOutput output, ContainsFilter object) throws IOException { output.writeUTF(object.target); } @Override public ContainsFilter readObject(ObjectInput input) throws IOException, ClassNotFoundException { return new ContainsFilter(input.readUTF()); } }
You could also use an advanced externalizer for the collector supplier to reduce the payload size even further.
Map<Object, String> jbossValues = cache.entrySet().stream() .filter(new ContainsFilter("Jboss")) .collect(ToMapCollectorSupplier.INSTANCE); class ToMapCollectorSupplier<K, U> implements Supplier<Collector<Map.Entry<K, U>, ?, Map<K, U>>> { static final ToMapCollectorSupplier INSTANCE = new ToMapCollectorSupplier(); private ToMapCollectorSupplier() { } @Override public Collector<Map.Entry<K, U>, ?, Map<K, U>> get() { return Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue); } } class ToMapCollectorSupplierExternalizer implements AdvancedExternalizer<ToMapCollectorSupplier> { @Override public Set<Class<? extends ToMapCollectorSupplier>> getTypeClasses() { return Util.asSet(ToMapCollectorSupplier.class); } @Override public Integer getId() { return CUSTOM_ID; } @Override public void writeObject(ObjectOutput output, ToMapCollectorSupplier object) throws IOException { } @Override public ToMapCollectorSupplier readObject(ObjectInput input) throws IOException, ClassNotFoundException { return ToMapCollectorSupplier.INSTANCE; } }
10.3.3. Parallel Computation
Distributed streams by default try to parallelize as much as possible. It is possible for the end user to control this and actually they always have to control one of the options. There are 2 ways these streams are parallelized.
Local to each node When a stream is created from the cache collection the end user can choose between invoking stream or parallelStream method. Depending on if the parallel stream was picked will enable multiple threading for each node locally. Note that some operations like a rehash aware iterator and forEach operations will always use a sequential stream locally. This could be enhanced at some point to allow for parallel streams locally.
Users should be careful when using local parallelism as it requires having a large number of entries or operations that are computationally expensive to be faster. Also it should be noted that if a user uses a parallel stream with forEach
that the action should not block as this would be executed on the common pool, which is normally reserved for computation operations.
Remote requests When there are multiple nodes it may be desirable to control whether the remote requests are all processed at the same time concurrently or one at a time. By default all terminal operations except the iterator perform concurrent requests. The iterator, method to reduce overall memory pressure on the local node, only performs sequential requests which actually performs slightly better.
If a user wishes to change this default however they can do so by invoking the sequentialDistribution or parallelDistribution methods on the CacheStream
.
10.3.4. Task timeout
It is possible to set a timeout value for the operation requests. This timeout is used only for remote requests timing out and it is on a per request basis. The former means the local execution will not timeout and the latter means if you have a failover scenario as described above the subsequent requests each have a new timeout. If no timeout is specified it uses the replication timeout as a default timeout. You can set the timeout in your task by doing the following:
CacheStream<Object, String> stream = cache.entrySet().stream(); stream.timeout(1, TimeUnit.MINUTES);
For more information about this, please check the java doc in timeout javadoc.
10.3.5. Injection
The Stream has a terminal operation called forEach which allows for running some sort of side effect operation on the data. In this case it may be desirable to get a reference to the Cache
that is backing this Stream. If your Consumer
implements the CacheAware interface the injectCache
method be invoked before the accept method from the Consumer
interface.
10.3.6. Distributed Stream execution
Distributed streams execution works in a fashion very similiar to map reduce. Except in this case we are sending zero to many intermediate operations (map, filter etc.) and a single terminal operation to the various nodes. The operation basically comes down to the following:
- The desired segments are grouped by which node is the primary owner of the given segment
A request is generated to send to each remote node that contains the intermediate and terminal operations including which segments it should process
- The terminal operation will be performed locally if necessary
- Each remote node will receive this request and run the operations and subsequently send the response back
- The local node will then gather the local response and remote responses together performing any kind of reduction required by the operations themselves.
- Final reduced response is then returned to the user
In most cases all operations are fully distributed, as in the operations are all fully applied on each remote node and usually only the last operation or something related may be reapplied to reduce the results from multiple nodes. One important note is that intermediate values do not actually have to be serializable, it is the last value sent back that is the part desired (exceptions for various operations will be highlighted below).
Terminal operator distributed result reductions The following paragraphs describe how the distributed reductions work for the various terminal operators. Some of these are special in that an intermediate value may be required to be serializable instead of the final result.
- allMatch noneMatch anyMatch
- The allMatch operation is ran on each node and then all the results are logically anded together locally to get the appropriate value. The noneMatch and anyMatch operations use a logical or instead. These methods also have early termination support, stopping remote and local operations once the final result is known.
- collect
- The collect method is interesting in that it can do a few extra steps. The remote node performs everything as normal except it doesn’t perform the final finisher upon the result and instead sends back the fully combined results. The local thread then combines the remote and local result into a value which is then finally finished. The key here to remember is that the final value doesn’t have to be serializable but rather the values produced from the supplier and combiner methods.
- count
- The count method just adds the numbers together from each node.
- findAny findFirst
- The findAny operation returns just the first value they find, whether it was from a remote node or locally. Note this supports early termination in that once a value is found it will not process others. Note the findFirst method is special since it requires a sorted intermediate operation, which is detailed in the exceptions section.
- max min
- The max and min methods find the respective min or max value on each node then a final reduction is performed locally to ensure only the min or max across all nodes is returned.
- reduce
- The various reduce methods 1 , 2 , 3 will end up serializing the result as much as the accumulator can do. Then it will accumulate the local and remote results together locally, before combining if you have provided that. Note this means a value coming from the combiner doesn’t have to be Serializable.
10.3.7. Key based rehash aware operators
The iterator, spliterator and forEach are unlike the other terminal operators in that the rehash awareness has to keep track of what keys per segment have been processed instead of just segments. This is to guarantee an exactly once (iterator & spliterator) or at least once behavior (forEach) even under cluster membership changes.
The iterator
and spliterator
operators when invoked on a remote node will return back batches of entries, where the next batch is only sent back after the last has been fully consumed. This batching is done to limit how many entries are in memory at a given time. The user node will hold onto which keys it has processed and when a given segment is completed it will release those keys from memory. This is why sequential processing is preferred for the iterator method, so only a subset of segment keys are held in memory at once, instead of from all nodes.
The forEach()
method also returns batches, but it returns a batch of keys after it has finished processing at least a batch worth of keys. This way the originating node can know what keys have been processed already to reduce chances of processing the same entry again. Unfortunately this means it is possible to have an at least once behavior when a node goes down unexpectedly. In this case that node could have been processing a batch and not yet completed one and those entries that were processed but not in a completed batch will be ran again when the rehash failure operation occurs. Note that adding a node will not cause this issue as the rehash failover doesn’t occur until all responses are received.
These operations batch sizes are both controlled by the same value which can be configured by invoking distributedBatchSize method on the CacheStream
. This value will default to the chunkSize
configured in state transfer. Unfortunately this value is a tradeoff with memory usage vs performance vs at least once and your mileage may vary.
Using iterator
with replicated and distributed caches
When a node is the primary or backup owner of all requested segments for a distributed stream, Red Hat Data Grid performs the iterator
or spliterator
terminal operations locally, which optimizes performance as remote iterations are more resource intensive.
This optimization applies to both replicated and distributed caches. However, Red Hat Data Grid performs iterations remotely when using cache stores that are both shared
and have write-behind
enabled. In this case performing the iterations remotely ensures consistency.
10.3.8. Intermediate operation exceptions
There are some intermediate operations that have special exceptions, these are skip, peek, sorted 12. & distinct. All of these methods have some sort of artificial iterator implanted in the stream processing to guarantee correctness, they are documented as below. Note this means these operations may cause possibly severe performance degradation.
- Skip
- An artificial iterator is implanted up to the intermediate skip operation. Then results are brought locally so it can skip the appropriate amount of elements.
- Sorted
- WARNING: This operation requires having all entries in memory on the local node. An artificial iterator is implanted up to the intermediate sorted operation. All results are sorted locally. There are possible plans to have a distributed sort which returns batches of elements, but this is not yet implemented.
- Distinct
- WARNING: This operation requires having all or nearly all entries in memory on the local node. Distinct is performed on each remote node and then an artificial iterator returns those distinct values. Then finally all of those results have a distinct operation performed upon them.
The rest of the intermediate operations are fully distributed as one would expect.
10.3.9. Examples
Word Count
Word count is a classic, if overused, example of map/reduce paradigm. Assume we have a mapping of key
public class WordCountExample { /** * In this example replace c1 and c2 with * real Cache references * * @param args */ public static void main(String[] args) { Cache<String, String> c1 = ...; Cache<String, String> c2 = ...; c1.put("1", "Hello world here I am"); c2.put("2", "Infinispan rules the world"); c1.put("3", "JUDCon is in Boston"); c2.put("4", "JBoss World is in Boston as well"); c1.put("12","JBoss Application Server"); c2.put("15", "Hello world"); c1.put("14", "Infinispan community"); c2.put("15", "Hello world"); c1.put("111", "Infinispan open source"); c2.put("112", "Boston is close to Toronto"); c1.put("113", "Toronto is a capital of Ontario"); c2.put("114", "JUDCon is cool"); c1.put("211", "JBoss World is awesome"); c2.put("212", "JBoss rules"); c1.put("213", "JBoss division of RedHat "); c2.put("214", "RedHat community"); Map<String, Long> wordCountMap = c1.entrySet().parallelStream() .map(e -> e.getValue().split("\\s")) .flatMap(Arrays::stream) .collect(() -> Collectors.groupingBy(Function.identity(), Collectors.counting())); } }
In this case it is pretty simple to do the word count from the previous example.
However what if we want to find the most frequent word in the example? If you take a second to think about this case you will realize you need to have all words counted and available locally first. Thus we actually have a few options.
We could use a finisher on the collector, which is invoked on the user thread after all the results have been collected. Some redundant lines have been removed from the previous example.
public class WordCountExample { public static void main(String[] args) { // Lines removed String mostFrequentWord = c1.entrySet().parallelStream() .map(e -> e.getValue().split("\\s")) .flatMap(Arrays::stream) .collect(() -> Collectors.collectingAndThen( Collectors.groupingBy(Function.identity(), Collectors.counting()), wordCountMap -> { String mostFrequent = null; long maxCount = 0; for (Map.Entry<String, Long> e : wordCountMap.entrySet()) { int count = e.getValue().intValue(); if (count > maxCount) { maxCount = count; mostFrequent = e.getKey(); } } return mostFrequent; })); }
Unfortunately the last step is only going to be ran in a single thread, which if we have a lot of words could be quite slow. Maybe there is another way to parallelize this with Streams.
We mentioned before we are in the local node after processing, so we could actually use a stream on the map results. We can therefore use a parallel stream on the results.
public class WordFrequencyExample { public static void main(String[] args) { // Lines removed Map<String, Long> wordCount = c1.entrySet().parallelStream() .map(e -> e.getValue().split("\\s")) .flatMap(Arrays::stream) .collect(() -> Collectors.groupingBy(Function.identity(), Collectors.counting())); Optional<Map.Entry<String, Long>> mostFrequent = wordCount.entrySet().parallelStream().reduce( (e1, e2) -> e1.getValue() > e2.getValue() ? e1 : e2);
This way you can still utilize all of the cores locally when calculating the most frequent element.
Remove specific entries
Distributed streams can also be used as a way to modify data where it lives. For example you may want to remove all entries in your cache that contain a specific word.
public class RemoveBadWords { public static void main(String[] args) { // Lines removed String word = .. c1.entrySet().parallelStream() .filter(e -> e.getValue().contains(word)) .forEach((c, e) -> c.remove(e.getKey());
If we carefully note what is serialized and what is not, we notice that only the word along with the operations are serialized across to other nods as it is captured by the lambda. However the real saving piece is that the cache operation is performed on the primary owner thus reducing the amount of network traffic required to remove these values from the cache. The cache is not captured by the lambda as we provide a special BiConsumer method override that when invoked on each node passes the cache to the BiConsumer
One thing to keep in mind using the forEach
command in this manner is that the underlying stream obtains no locks. The cache remove operation will still obtain locks naturally, but the value could have changed from what the stream saw. That means that the entry could have been changed after the stream read it but the remove actually removed it.
We have specifically added a new variant which is called LockedStream
.
Plenty of other examples
The Streams
API is a JRE tool and there are lots of examples for using it. Just remember that your operations need to be Serializable in some way.
10.4. Distributed Execution
Distributed Executor has been deprecated as of Red Hat Data Grid 9.1. You should use either a Cluster Executor or Distributed Stream to perform the operations they were doing before.
Red Hat Data Grid provides distributed execution through a standard JDK ExecutorService interface. Tasks submitted for execution, instead of being executed in a local JVM, are executed on an entire cluster of Red Hat Data Grid nodes. Every DistributedExecutorService is bound to one particular cache. Tasks submitted will have access to key/value pairs from that particular cache if and only if the task submitted is an instance of DistributedCallable. Also note that there is nothing preventing users from submitting a familiar Runnable or Callable just like to any other ExecutorService. However, DistributedExecutorService, as it name implies, will likely migrate submitted Callable or Runnable to another JVM in Red Hat Data Grid cluster, execute it and return a result to task invoker. Due to a potential task migration to other nodes every Callable, Runnable and/or DistributedCallable submitted must be either Serializable or Externalizable. Also the value returned from a callable must be Serializable or Externalizable as well. If the value returned is not serializable a NotSerializableException will be thrown.
Red Hat Data Grid’s distributed task executors use data from Red Hat Data Grid cache nodes as input for execution tasks. Most other distributed frameworks do not have that leverage and users have to specify input for distributed tasks from some well known location. Furthermore, users of Red Hat Data Grid distributed execution framework do not have to configure store for intermediate and final results thus removing another layer of complexity and maintenance.
Our distributed execution framework capitalizes on the fact input data in Red Hat Data Grid data grid is already load balanced (in case of DIST mode). Since input data is already balanced execution tasks will be automatically balanced as well; users do not have to explicitly assign work tasks to specific Red Hat Data Grid nodes. However, our framework accommodates users to specify arbitrary subset of cache keys as input for distributed execution tasks.
10.4.1. DistributedCallable API
In case users needs access to Red Hat Data Grid cache data for an execution of a task we recommend that you encapsulate task in DistributedCallable interface. DistributedCallable is a subtype of the existing Callable from java.util.concurrent package; DistributedCallable can be executed in a remote JVM and receive input from Red Hat Data Grid cache. Task’s main algorithm could essentially remain unchanged, only the input source is changed. Existing Callable implementations most likely get their input in a form of some Java object/primitive while DistributedCallable gets its input from an Red Hat Data Grid cache. Therefore, users who have already implemented Callable interface to describe their task units would simply extend DistributedCallable and use keys from Red Hat Data Grid execution environment as input for the task. Implentation of DistributedCallable can in fact continue to support implementation of an already existing Callable while simultaneously be ready for distribited execution by extending DistributedCallable.
public interface DistributedCallable<K, V, T> extends Callable<T> { /** * Invoked by execution environment after DistributedCallable * has been migrated for execution to a specific node. * * @param cache * cache whose keys are used as input data for this * DistributedCallable task * @param inputKeys * keys used as input for this DistributedCallable task */ public void setEnvironment(Cache<K, V> cache, Set<K> inputKeys); }
10.4.2. Callable and CDI
Users that do not want or can not implement DistributedCallable yet need a reference to input cache used in DistributedExecutorService have an option of the input cache being injected by CDI mechanism. Upon arrival of user’s Callable to an Red Hat Data Grid executing node, Red Hat Data Grid CDI mechanism will provide appropriate cache reference and inject it to executing Callable. All one has to do is to declare a Cache field in Callable and annotate it with org.infinispan.cdi.Input annotation along with mandatory @Inject annotation.
public class CallableWithInjectedCache implements Callable<Integer>, Serializable { @Inject @Input private Cache<String, String> cache; @Override public Integer call() throws Exception { //use injected cache reference return 1; } }
10.4.3. DistributedExecutorService, DistributedTaskBuilder and DistributedTask API
DistributedExecutorService is a simple extension of a familiar ExecutorService from java.util.concurrent package. However, advantages of DistributedExecutorService are not to be overlooked. Existing Callable tasks, instead of being executed in JDK’s ExecutorService, are also eligible for execution on Red Hat Data Grid cluster. Red Hat Data Grid execution environment would migrate a task to execution node(s), run the task and return the result(s) to the calling node. Of course, not all Callable tasks would benefit from parallel distributed execution. Excellent candidates are long running and computationally intensive tasks that can run concurrently and/or tasks using input data that can be processed concurrently. For more details about good candidates for parallel execution and parallel algorithms in general refer to Introduction to Parallel Computing .
The second advantage of the DistributedExecutorService is that it allows a quick and simple implementation of tasks that take input from Red Hat Data Grid cache nodes, execute certain computation and return results to the caller. Users would specify which keys to use as input for specified DistributedCallable and submit that callable for execution on Red Hat Data Grid cluster. Red Hat Data Grid runtime would locate the appriate keys, migrate DistributedCallable to target execution node(s) and finally return a list of results for each executed Callable. Of course, users can omit specifying input keys in which case Red Hat Data Grid would execute DistributedCallable on all keys for a specified cache.
Lets see how we can use DistributedExecutorService If you already have Callable/Runnable tasks defined! Well, simply submit them to an instance of DefaultExecutorService for execution!
ExecutorService des = new DefaultExecutorService(cache); Future<Boolean> future = des.submit(new SomeCallable()); Boolean r = future.get();
In case you need to specify more task parameters like task timeout, custom failover policy or execution policy use DistributedTaskBuilder and DistributedTask API.
DistributedExecutorService des = new DefaultExecutorService(cache); DistributedTaskBuilder<Boolean> taskBuilder = des.createDistributedTaskBuilder(new SomeCallable()); taskBuilder.timeout(10,TimeUnit.SECONDS); ... ... DistributedTask<Boolean> distributedTask = taskBuilder.build(); Future<Boolean> future = des.submit(distributedTask); Boolean r = future.get();
10.4.4. Distributed task failover
Distributed execution framework supports task failover. By default no failover policy is installed and task’s Runnable/Callable/DistributedCallable will simply fail. Failover mechanism is invoked in the following cases:
a) Failover due to a node failure where task is executing
b) Failover due to a task failure (e.g. Callable task throws Exception).
Red Hat Data Grid provides random node failover policy which will attempt execution of a part of distributed task on another random node, if such node is available. However, users that have a need to implement a more sophisticated failover policy can implement DistributedTaskFailoverPolicy interface. For example, users might want to use consistent hashing (CH) mechanism for failover of uncompleted tasks. CH based failover might for example migrate failed task T to cluster node(s) having a backup of input data that was executed on a failed node F.
/** * DistributedTaskFailoverPolicy allows pluggable fail over target selection for a failed remotely * executed distributed task. * */ public interface DistributedTaskFailoverPolicy { /** * As parts of distributively executed task can fail due to the task itself throwing an exception * or it can be a system caused failure (e.g node failed or left cluster during task * execution etc). * * @param failoverContext * the FailoverContext of the failed execution * @return result the Address of the node selected for fail over execution */ Address failover(FailoverContext context); /** * Maximum number of fail over attempts permitted by this DistributedTaskFailoverPolicy * * @return max number of fail over attempts */ int maxFailoverAttempts(); }
Therefore one could for example specify random failover execution policy simply by:
DistributedExecutorService des = new DefaultExecutorService(cache); DistributedTaskBuilder<Boolean> taskBuilder = des.createDistributedTaskBuilder(new SomeCallable()); taskBuilder.failoverPolicy(DefaultExecutorService.RANDOM_NODE_FAILOVER); DistributedTask<Boolean> distributedTask = taskBuilder.build(); Future<Boolean> future = des.submit(distributedTask); Boolean r = future.get();
10.4.5. Distributed task execution policy
DistributedTaskExecutionPolicy is an enum that allows tasks to specify its custom task execution policy across Red Hat Data Grid cluster. DistributedTaskExecutionPolicy effectively scopes execution of tasks to a subset of nodes. For example, someone might want to exclusively execute tasks on a local network site instead of a backup remote network centre as well. Others might, for example, use only a dedicated subset of a certain Red Hat Data Grid rack nodes for specific task execution. DistributedTaskExecutionPolicy is set per instance of DistributedTask.
DistributedExecutorService des = new DefaultExecutorService(cache); DistributedTaskBuilder<Boolean> taskBuilder = des.createDistributedTaskBuilder(new SomeCallable()); taskBuilder.executionPolicy(DistributedTaskExecutionPolicy.SAME_RACK); DistributedTask<Boolean> distributedTask = taskBuilder.build(); Future<Boolean> future = des.submit(distributedTask); Boolean r = future.get();
10.4.6. Examples
Pi approximation can greatly benefit from parallel distributed execution in DistributedExecutorService. Recall that area of the square is Sa = 4r2 and area of the circle is Ca=pi*r2. Substituting r2 from the second equation into the first one it turns out that pi = 4 * Ca/Sa. Now, image that we can shoot very large number of darts into a square; if we take ratio of darts that land inside a circle over a total number of darts shot we will approximate Ca/Sa value. Since we know that pi = 4 * Ca/Sa we can easily derive approximate value of pi. The more darts we shoot the better approximation we get. In the example below we shoot 10 million darts but instead of "shooting" them serially we parallelize work of dart shooting across entire Red Hat Data Grid cluster.
public class PiAppx { public static void main (String [] arg){ List<Cache> caches = ...; Cache cache = ...; int numPoints = 10000000; int numServers = caches.size(); int numberPerWorker = numPoints / numServers; DistributedExecutorService des = new DefaultExecutorService(cache); long start = System.currentTimeMillis(); CircleTest ct = new CircleTest(numberPerWorker); List<Future<Integer>> results = des.submitEverywhere(ct); int countCircle = 0; for (Future<Integer> f : results) { countCircle += f.get(); } double appxPi = 4.0 * countCircle / numPoints; System.out.println("Distributed PI appx is " + appxPi + " completed in " + (System.currentTimeMillis() - start) + " ms"); } private static class CircleTest implements Callable<Integer>, Serializable { /** The serialVersionUID */ private static final long serialVersionUID = 3496135215525904755L; private final int loopCount; public CircleTest(int loopCount) { this.loopCount = loopCount; } @Override public Integer call() throws Exception { int insideCircleCount = 0; for (int i = 0; i < loopCount; i++) { double x = Math.random(); double y = Math.random(); if (insideCircle(x, y)) insideCircleCount++; } return insideCircleCount; } private boolean insideCircle(double x, double y) { return (Math.pow(x - 0.5, 2) + Math.pow(y - 0.5, 2)) <= Math.pow(0.5, 2); } } }