このコンテンツは選択した言語では利用できません。
Chapter 16. MapReduce
Important
Map/Reduce has been deprecated in JBoss Data Grid 6.6.0, and is expected to be removed in subsequent versions. This feature will be replaced by Distributed Streams, which was shown to have better performance.
The Red Hat JBoss Data Grid MapReduce model is an adaptation of Google's MapReduce model.
MapReduce is a programming model used to process and generate large data sets. It is typically used in distributed computing environments where nodes are clustered. In JBoss Data Grid, MapReduce allows transparent distributed processing of large amounts of data across the grid. It does this by performing computations locally where the data is stored whenever possible.
MapReduce uses the two distinct computational phases of map and reduce to process information requests through the data grid. The process occurs as follows:
- The user initiates a task on a cache instance, which runs on a cluster node (the master node).
- The master node receives the task input, divides the task, and sends tasks for map phase execution on the grid.
- Each node executes a
Mapper
function on its input, and returns intermediate results back to the master node.- If the
useIntermediateSharedCache
parameter is set to"true"
, the map results are inserted in an intermediary cache, rather than being returned to the master node. - If a
Combiner
has been specified withtask.combinedWith(combiner)
, theCombiner
is called on theMapper
results and the combiner's results are returned to the master node or inserted in the intermediary cache.Note
Combiners are not required but can only be used when the function is both commutative (changing the order of the operands does not change the results) and associative (the order in which the operations are performed does not matter as long as the sequence of the operands is not changed). Combiners are advantageous to use because they can improve the speeds of MapReduceTask executions.
- The master node collects all intermediate results from the map phase and merges all intermediate values associated with the same intermediate key.
- If the
distributedReducePhase
parameter is set totrue
, the merging of the intermediate values is done on each node, as theMapper
orCombiner
results are inserted in the intermediary cache.The master node only receives the intermediate keys.
- The master node sends intermediate key/value pairs for reduction on the grid.
- If the
distributedReducePhase
parameter is set to"false"
, the reduction phase is executed only on the master node.
- The final results of the reduction phase are returned. Optionally specify the target cache for the results using the instructions in Section 16.1.2, “Specify the Target Cache”.
- If the
distributedReducePhase
parameter is set to"true"
, the master node running the task receives all results from the reduction phase and returns the final result to the MapReduce task initiator. - If no target cache is specified and no collator is specified (using
task.execute(Collator)
), the result map is returned to the master node.
16.1. The MapReduce API
In Red Hat JBoss Data Grid, each MapReduce task has five main components:
Mapper
Reducer
Collator
MapReduceTask
Combiners
The
Mapper
class implementation is a component of MapReduceTask
, which is invoked once per input cache entry key/value pair. Map
is a process of applying a given function to each element of a list, returning a list of results.
Each node in the JBoss Data Grid executes the
Mapper
on a given cache entry key/value input pair. It then transforms this cache entry key/value pair into an intermediate key/value pair, which is emitted into the provided Collector
instance.
Note
The MapReduceTask requires a Mapper and a Reducer but using a Collator or Combiner is optional.
Example 16.1. Executing the Mapper
public interface Mapper<KIn, VIn, KOut, VOut> extends Serializable { /** * Invoked once for each input cache entry KIn,VOut pair. */ void map(KIn key, VIn value, Collector<KOut, VOut> collector);
At this stage, for each output key there may be multiple output values. The multiple values must be reduced to a single value, and this is the task of the
Reducer
. JBoss Data Grid's distributed execution environment creates one instance of Reducer
per execution node.
Example 16.2. Reducer
public interface Reducer<KOut, VOut> extends Serializable { /** * Combines/reduces all intermediate values for a particular intermediate key to a single value. * <p> * */ VOut reduce(KOut reducedKey, Iterator<VOut> iter); }
The same
Reducer
interface is used for Combiners
. A Combiner
is similar to a Reducer
, except that it must be able to work on partial results. The Combiner
is executed on the results of the Mapper
, on the same node, without considering the other nodes that might have generated values for the same intermediate key.
Note
Combiners are not required but can only be used when the function is both commutative (changing the order of the operands does not change the results) and associative (the order in which the operations are performed does not matter as long as the sequence of the operands is not changed). Combiners are advantageous to use because they can improve the speeds of MapReduceTask executions.
As
Combiners
only see a part of the intermediate values, they cannot be used in all scenarios, however when used they can reduce network traffic and memory consumption in the intermediate cache significantly.
The
Collator
coordinates results from Reducers
that have been executed on JBoss Data Grid, and assembles a final result that is delivered to the initiator of the MapReduceTask
. The Collator
is applied to the final map key/value result of MapReduceTask
.
Example 16.3. Assembling the Result
public interface Collator<KOut, VOut, R> { /** * Collates all reduced results and returns R to invoker of distributed task. * * @return final result of distributed task computation */ R collate(Map<KOut, VOut> reducedResults);
16.1.1. MapReduceTask
In Red Hat JBoss Data Grid,
MapReduceTask
is a distributed task, which unifies the Mapper
, Combiner
, Reducer
, and Collator
components into a cohesive computation, which can be parallelized and executed across a large-scale cluster.
These components can be specified with a fluent API. However,as most of them are serialized and executed on other nodes, using inner classes is not recommended.
Example 16.4. Specifying MapReduceTask
Components
new MapReduceTask(cache) .mappedWith(new MyMapper()) .combinedWith(new MyCombiner()) .reducedWith(new MyReducer()) .execute(new MyCollator());
MapReduceTask
requires a cache containing data that will be used as input for the task. The JBoss Data Grid execution environment will instantiate and migrate instances of provided Mappers
and Reducers
seamlessly across the nodes.
By default, all available key/value pairs of a specified cache will be used as input data for the task. This can be modified by using the
onKeys
method as an input key filter.
There are two
MapReduceTask
constructor parameters that determine how the intermediate values are processed:
distributedReducePhase
- When set tofalse
, the default setting, the reducers are only executed on the master node. If set totrue
, the reducers are executed on every node in the cluster.useIntermediateSharedCache
- Only important ifdistributedReducePhase
is set totrue
. Iftrue
, which is the default setting, this task will share intermediate value cache with other executing MapReduceTasks on the grid. If set tofalse
, this task will use its own dedicated cache for intermediate values.
Note
The default timeout for
MapReduceTask
is 0 (zero). That is, the task will wait indefinitely for its completion by default.
16.1.2. Specify the Target Cache
Red Hat JBoss Data Grid's MapReduce implementation allows users to specify a target cache to store the results of an executed task. The results are available after the execute method (which is synchronous) is complete.
This variant of the execute method prevents the master JVM node from exceeding its allows maximum heap size. This is especially relevant if objects that are the results of the reduce phase have a large memory footprint or if multiple MapReduceTasks are concurrently executing on the master task node.
Use the following method of MapReduceTask to specify a Cache object to store the results:
public void execute(Cache<KOut, VOut> resultsCache) throws CacheException
Use the following method of MapReduceTask to specify a name for the target cache:
public void execute(String resultsCache) throws CacheException
16.1.3. Mapper and CDI
The
Mapper
is invoked with appropriate input key/value pairs on an executing node, however Red Hat JBoss Data Grid also provides a CDI injection for an input cache. The CDI injection can be used where additional data from the input cache is required in order to complete map transformation.
When the
Mapper
is executed on a JBoss Data Grid executing node, the JBoss Data Grid CDI module provides an appropriate cache reference, which is injected to the executing Mapper
. To use the JBoss Data Grid CDI module with Mapper
:
- Declare a cache field in
Mapper
. - Annotate the cache field
Mapper
with@org.infinispan.cdi.Input
. - Annotate with mandatory
@Inject annotation
.
Example 16.5. Using a CDI Injection
public class WordCountCacheInjecterMapper implements Mapper<String, String, String, Integer> { @Inject @Input private Cache<String, String> cache; @Override public void map(String key, String value, Collector<String, Integer> collector) { //use injected cache if needed StringTokenizer tokens = new StringTokenizer(value); while (tokens.hasMoreElements()) { for(String token : value.split("\\w")) { collector.emit(token, 1); } } } }