For in-depth information on various Big Data technologies, check out my free e-book “Introduction to Big Data“.

Distributed parallel computing is not new. Supercomputers have been using MPI for years for complex numerical computing. Although MPI provides a comprehensive API for data transfer and synchronization, it is not very suitable for big data. Due to the large data size and shared-nothing architecture for scalability, data distribution and I/O are critical to big data analytics while MPI almost ignores it. On the other hand, many big data analytics are conceptually straightforward and does not need very complicated communication and synchronization mechanism. Based on these observations, Google invented MapReduce to deal the issues of how to parallelize the computation, distribute the data, and handle failures.

Overview

In a shared-nothing distributed computing environment, a computation is much more efficient if it is executed near the data it operates on. This is especially true when the size of the data set is huge as it minimizes network traffic and increases the overall throughput of the system. Therefore, it is often better to migrate the computation closer to where the data is located rather than moving the data to where the application is running. With GFS/HDFS, MapReduce provides such a parallel programming framework.

Inspired by the map and reduce functions commonly used in functional programming, a MapReduce program is composed of a Map() procedure that performs transformation and a Reduce() procedure that takes the shuffled output of Map as input and performs a summarization operation. More specifically, the user-defined Map function processes a key-value pair to generate a set of intermediate key-value pairs, and the Reduce function aggregates all intermediate values associated with the same intermediate key.

MapReduce applications are automatically parallelized and executed on a large cluster of commodity machines. During the execution, the Map invocations are distributed across multiple machines by automatically partitioning the input data into a set of M splits. The input splits can be processed in parallel by different machines. Reduce invocations are distributed by partitioning the intermediate key space into R pieces using a partitioning function. The number of partitions and the partitioning function are specified by the user. Besides partitioning the input data and running the various tasks in parallel, the framework also manages all communications and data transfers, load balance, and fault tolerance.

MapReduce provides programmers a really simple parallel computing paradigm. Because of automatic parallelization, no explicit handling of data transfer and synchronization in programs, and no deadlock, this model is very attractive. MapReduce is also designed to process very large data that is too big to fit into the memory (combined from all nodes). To achieve that, MapReduce employs a data flow model, which also provides a simple I/O interface to access large amount of data in distributed file system. It also exploits data locality for efficiency. In most cases, we do not need to worry about I/O at all.

Data Flow

MapReduce Data Flow

For a given task, the MapReduce system runs as follows

  • Prepare the Map() input

The system splits the input files into M pieces and then starts up M Map workers on a cluster of machines.

  • Run the user-defined Map() code

The Map worker parses key-value pairs out of the assigned split and passes each pair to the user-defined Map function. The intermediate key-value pairs produced by the Map function are buffered in memory. Periodically, the buffered pairs are written to local disk, partitioned into R regions for sharding purposes by the partitioning function (called partitioner) that is given the key and the number of reducers R and returns the index of the desired reducer.

  • Shuffle the Map output to the Reduce processors

When ready, a reduce worker reads remotely the buffered data from the local disks of the map workers. When a reduce worker has read all intermediate data, it sorts the data by the intermediate keys so that all occurrences of the same key are grouped together. Typically many different keys map to the same reduce task.

  • Run the user-defined Reduce() code

The reduce worker iterates over the sorted intermediate data and for each unique intermediate key encountered, it passes the key and the corresponding set of intermediate values to the user’s Reduce function.

  • Produce the final output

The final output is available in the R output files (one per reduce task).

Optionally, a combiner can be used between map and reduce as an optimization. The combiner function runs on the output of the map phase and is used as a filtering or an aggregating step to lessen the data that are being passed to the reducer. In most of the cases the reducer class is set to be the combiner class so that we can save network time. Note that this works only if reduce function is commutative and associative.

In practice, one should pay attention to the task granularity, i.e. the number of map tasks M and the number of reduce tasks R. In general, M should be much larger than the number of nodes in cluster, which improves load balancing and speeds recovery from worker failure. The right level of parallelism for maps seems to be around 10-100 maps per node (maybe more for very cpu light map tasks). Besides, the task setup takes awhile. On a Hadoop cluster of 100 nodes, it takes 25 seconds until all nodes are executing the job. So it is best if the maps take at least a minute to execute. In Hadoop, one can call JobConf.setNumMapTasks(int) to set the number of map tasks. Note that it only provides a hint to the framework.

The number of reducers is usually a small multiple of the number of nodes. The right factor number seems to be 0.95 for well-balanced data (per intermediate key) or 1.75 otherwise for better load balancing. Note that we reserve a few reduce slots for speculative tasks and failed tasks. We can set the number of reduce tasks by JobConf.setNumReduceTasks(int) in Hadoop and the framework will honor it. It is fine to set R to zero if no reduction is desired.

Secondary Sorting

The output of Mappers is firstly sorted by the intermediate keys. However, we do want to sort the intermediate values (or some fields of intermediate values) sometimes, e.g. calculating the stock price moving average where the key is the stock ticker and the value is a pair of timestamp and stock price. If the values of a given key are sorted by the timestamp, we can easily calculate the moving average with a sliding window over the values. This problem is called secondary sorting.

A direct approach to secondary sorting is for the reducer to buffer all of the values for a given key and do an in-memory sort. Unfortunately, it may cause the reducer to run out of memory.

Alternatively, we may use a composite key that has multiple parts. In the case of calculating moving average, we may create a composite key of (ticker, timestamp) and also provide a customized sort comparator (subclass of WritableComparator) that compares ticker and then timestamp. To ensure only the ticker (referred as natural key) is considered when determining which reducer to send the data to, we need to write a custom partitioner (subclass of Partitioner) that is solely based on the natural key. Once the data reaches a reducer, all data is grouped by key. Since we have a composite key, we need to make sure records are grouped solely by the natural key by implementing a group comparator (another subclass of WritableComparator) that considers only the natural key.

Advertisements