Chapter 17. MapReduce
The Red Hat JBoss Data Grid MapReduce model is an adaptation of Google's MapReduce model.
MapReduce is a programming model used to process and generate large data sets. It is typically used in distributed computing environments where nodes are clustered. In JBoss Data Grid, MapReduce allows transparent distributed processing of large amounts of data across the grid. It does this by performing computations locally where the data is stored whenever possible.
MapReduce uses the two distinct computational phases of map and reduce to process information requests through the data grid. The process occurs as follows:
- The user initiates a task on a cache instance, which runs on a cluster node (the master node).
- The master node receives the task input, divides the task, and sends tasks for map phase execution on the grid.
- Each node executes a
Mapper
function on its input, and returns intermediate results back to the master node.- If the
distributedReducePhase
parameter is set to"true"
, the map results are inserted in an intermediary cache, rather than being returned to the master node. - If a
Combiner
has been specified withtask.combinedWith(combiner)
, theCombiner
is called on theMapper
results and the combiner's results are returned to the master node or inserted in the intermediary cache.
- The master node collects all intermediate results from the map phase and merges all intermediate values associated with the same intermediate key.
- If the
distributedReducePhase
parameter is set totrue
, the merging of the intermediate values is done on each node, as theMapper
orCombiner
results are inserted in the intermediary cache.The master node only receives the intermediate keys.
- The master node sends intermediate key/value pairs for reduction on the grid.
- If the
distributedReducePhase
parameter is set to"false"
, the reduction phase is executed only on the master node.
- The final results of the reduction phase are returned.
- If the
distributedReducePhase
parameter is set to"true"
, the master node running the task receives all results from the reduction phase and returns the final result to the MapReduce task initiator. - If a
Collator
has been specified withtask.execute(Collator)
, theCollator
is executed on the reduction phase results, and theCollator
result is returned to the task initiator.
17.1. The MapReduce API
In Red Hat JBoss Data Grid, each MapReduce task has four main components:
Mapper
Reducer
Collator
MapReduceTask
The
Mapper
class implementation is a component of MapReduceTask
, which is invoked once per input cache entry key/value pair. Map
is a process of applying a given function to each element of a list, returning a list of results.
Each node in the JBoss Data Grid executes the
Mapper
on a given cache entry key/value input pair. It then transforms this cache entry key/value pair into an intermediate key/value pair, which is emitted into the provided Collator
instance.
public interface Mapper<KIn, VIn, KOut, VOut> extends Serializable { /** * Invoked once for each input cache entry KIn,VOut pair. */ void map(KIn key, VIn value, Collector<KOut, VOut> collector);
At this stage, for each output key there may be multiple output values. The multiple values must be reduced to a single value, and this is the task of the
Reducer
. JBoss Data Grid's distributed execution environment creates one instance of Reducer
per execution node.
public interface Reducer<KOut, VOut> extends Serializable { /** * Combines/reduces all intermediate values for a particular intermediate key to a single value. * <p> * */ VOut reduce(KOut reducedKey, Iterator<VOut> iter); }
The same
Reducer
interface is used for Combiners
. A Combiner
is similar to a Reducer
, except that it must be able to work on partial results. The Combiner
is executed on the results of the Mapper
, on the same node, without considering the other nodes that might have generated values for the same intermediate key.
As
Combiners
only see a part of the intermediate values, they cannot be used in all scenarios, however when used they can reduce network traffic and memory consumption in the intermediate cache significantly.
The
Collator
coordinates results from Reducers
that have been executed on JBoss Data Grid, and assembles a final result that is delivered to the initiator of the MapReduceTask
. The Collator
is applied to the final map key/value result of MapReduceTask
.
public interface Reducer<KOut, VOut> extends Serializable { /** * Combines/reduces all intermediate values for a particular intermediate key to a single value. * <p> * */ VOut reduce(KOut reducedKey, Iterator<VOut> iter); }
17.1.1. MapReduceTask
In Red Hat JBoss Data Grid,
MapReduceTask
is a distributed task, which unifies the Mapper
, Combiner
, Reducer
, and Collator
components into a cohesive computation, which can be parallelized and executed across a large-scale cluster.
These components can be specified with a fluent API. However,as most of them are serialized and executed on other nodes, using inner classes is not recommended.
For example:
new MapReduceTask(cache) .mappedWith(new MyMapper()) .combinedWith(new MyCombiner()) .reducedWith(new MyReducer()) .execute(new MyCollator());
MapReduceTask
requires a cache containing data that will be used as input for the task. The JBoss Data Grid execution environment will instantiate and migrate instances of provided Mappers
and Reducers
seamlessly across the nodes.
By default, all available key/value pairs of a specified cache will be used as input data for the task. This can be modified by using the
onKeys
method as an input key filter.
There are two
MapReduceTask
constructor parameters that determine how the intermediate values are processed:
distributedReducePhase
- When set to"false"
, the default setting, the reducers are only executed on the master node. If set to"true"
, the reducers are executed on every node in the cluster.useIntermediateSharedCache
- Only important ifdistributedReducePhase
is set to"true"
. If"true"
, which is the default setting, this task will share intermediate value cache with other executing MapReduceTasks on the grid. If set to"false"
, this task will use its own dedicated cache for intermediate values.
17.1.2. Mapper and CDI
The
Mapper
is invoked with appropriate input key/value pairs on an executing node, however Red Hat JBoss Data Grid also provides a CDI injection for an input cache. The CDI injection can be used where additional data from the input cache is required in order to complete map transformation.
When the
Mapper
is executed on a JBoss Data Grid executing node, the JBoss Data Grid CDI module provides an appropriate cache reference, which is injected to the executing Mapper
. To use the JBoss Data Grid CDI module with Mapper
:
- Declare a cache field in
Mapper
. - Annotate the cache field
Mapper
with@org.infinispan.cdi.Input
. - Annotate with mandatory
@Inject annotation
.
For example:
public class WordCountCacheInjecterMapper implements Mapper<String, String, String, Integer> { @Inject @Input private Cache<String, String> cache; @Override public void map(String key, String value, Collector<String, Integer> collector) { //use injected cache if needed StringTokenizer tokens = new StringTokenizer(value); while (tokens.hasMoreElements()) { for(String token : value.split("\\w")) { collector.emit(token, 1); } } } }