Tags

For in-depth information on various Big Data technologies, check out my free e-book “Introduction to Big Data“.

In the Distributed NoSQL series, I reviewed several popular open-source NoSQL solutions. With big data at hand, we will crunch numbers from them. Of course, we have to use some distributed parallel computing frameworks given the large data size. In this series, I will go through several such frameworks. Naturally, MapReduce is our first topic as it started the so called big data analytics.

Distributed parallel computing is not new. Supercomputers have been using MPI for years for complex numerical computing. Although MPI provides a comprehensive API for data transfer and synchronization, it is not very suitable for big data. Due to the large data size and shared-nothing architecture for scalability, data distribution and I/O are critical to big data analytics while MPI almost ignores it (MPI-I/O was added into MPI-2 but it is not easy to use and difficult to achieve good performance). On the other hand, many big data analytics are conceptually straightforward and doesn’t need very complicated communication and synchronization mechanism. Based on these observations, Google invented MapReduce to deal the issues of how to parallelize the computation, distribute the data, and handle failures.

For a given task, the MapReduce system runs as follows

Prepare the Map() input
The system splits the input files into M pieces and then starts up M Map workers on a cluster of machines.
Run the user-defined Map() code
The Map worker parses key-value pairs out of the assigned split and passes each pair to the user-defined Map function. The intermediate key-value pairs produced by the Map function are buffered in memory. Periodically, the buffered pairs are written to local disk, partitioned into R regions for sharding purposes by the partitioning function that is given the key and the number of reducers R and returns the index of the desired reducer.
Shuffle the Map output to the Reduce processors
When ready, a reduce worker reads remotely the buffered data from the local disks of the map workers. When a reduce worker has read all intermediate data, it sorts the data by the intermediate keys so that all occurrences of the same key are grouped together. Typically many different keys map to the same reduce task.
Run the user-defined Reduce() code
The reduce worker iterates over the sorted intermediate data and for each unique intermediate key encountered, it passes the key and the corresponding set of intermediate values to the user’s Reduce function.
Produce the final output
The final output is available in the R output files (one per reduce task).

Optionally, a combiner can be used between map and reduce as an optimization. The combiner function runs on the output of the map phase and is used as a filtering or an aggregating step to lessen the data that are being passed to the reducer. In most of the cases the reducer class is set to be the combiner class so that we can save network time. Note that this works only if reduce function is commutative and associative.

In practice, one should pay attention to the task granularity, i.e. the number of map tasks M and the number of reduce tasks R. In general, M should be much larger than the number of nodes in cluster, which improves load balancing and speeds recovery from worker failure. The right level of parallelism for maps seems to be around 10-100 maps per node (maybe more for very cpu light map tasks). Besides, the task setup takes awhile. On a Hadoop cluster of 100 nodes, it takes 25 seconds until all nodes are executing the job. So it is best if the maps take at least a minute to execute. In Hadoop, one can call JobConf.setNumMapTasks(int) to set the number of map tasks. Note that it only provides a hint to the framework.

The number of reducers is usually a small multiple of the number of nodes. The right factor number seems to be 0.95 for well-balanced data (per intermediate key) or 1.75 otherwise for better load balancing. Note that we reserve a few reduce slots for speculative tasks and failed tasks. We can set the number of reduce tasks by JobConf.setNumReduceTasks(int) in Hadoop and the framework will honor it. It is fine to set R to zero if no reduction is desired.

MapReduce provides programmers a simple parallel computing paradigm. Inspired by functional programming, the computing paradigm of MapReduce is really simple and easy to understand. Because of automatic parallelization, no explicit handling of data transfer and synchronization in programs, and no deadlock, this model is very attractive. MapReduce is also designed to process very large data that is too big to fit into the memory (combined from all nodes). To achieve that, MapReduce employs a data flow model, which also provides a simple I/O interface to access large amount of data in distributed file system. It also exploits data locality for efficiency. In most cases, we don’t need to worry about I/O at all.

Now let’s look into several examples, which will also help us understand the limitations of MapReduce.

Sort

The essential part of the MapReduce framework is a large distributed sort. So we just let the framework do the job in this case while the map is as simple as emitting the sort key and original input. The reduce operator is an identity function.

Grep

The map function emits a line if it matches a given pattern. Although the reduce part is not necessary in this case, people generally provide an identity function as the reduce function to merge map output into one final file.

public class GrepMapper<K> extends Mapper<K, Text, Text, LongWritable> {

  public static String PATTERN = "mapper.pattern";
  private Pattern pattern;

  // Setup the match pattern from job context.
  public void setup(Context context) {
    Configuration conf = context.getConfiguration();
    pattern = Pattern.compile(conf.get(PATTERN));
  }

  public void map(K key, Text value, Context context)
    throws IOException, InterruptedException {
    if (pattern.matcher(value.toString()).find()) {
      context.write(key, value);
    }
  }
}

In a relational database, one can achieve this by the following simple query in SQL.

SELECT * FROM T_KV WHERE value LIKE '%XYZ%';

Although this query requires a full table scan, a parallel DMBS can easily outperformance MapReduce in this case. It is because the setup cost of MapReduce is high and the reduce task has to open and combine many Map output files.

The performance gap will be much larger in case that an index can be used such as

SELECT * FROM T_PERSON WHERE age > 30;

 

Aggregation

Aggregation is a simple analytic calculation such as counting the number of access or users from different countries. Word count, the hello world program in the world of MapReduce, is an example of aggregation. For SQL, it simply means GROUP BY:

SELECT country, count(*) FROM T_WEB_LOG GROUP BY country;

With a combiner, the aggregation in MapReduce works pretty much same as in a parallel DBMS. Of course, a DBMS can still benefit a lot from an index on the group by field.

Join

A join combines records from two or more data sets by a common key. There are several ways to implement join in MapReduce. A straightforward approach is the reduce-side joins that take advantage of that identical keys to the same reducer. In practice, join, aggregation, and sort are frequently used together, e.g. finding the client of the ad that generates the most revenue (or clicks) during a period. In MapReduce, this has to be done in multiple phases. The first phrase filters the data base on the click timestamp and joins the client and click log datasets. The second phrase does the aggregation on the output of join and the third one finishes the task by sorting the output of aggregation.

Various benchmarks shows that parallel DBMSs are way faster than MapReduce for joins. Again an index on the join key is very helpful. But more importantly, joins can be done locally on each node if both tables are partitioned by the join key so that no data transfer is needed before the join.

K-Means Clustering

The k-means clustering is a simple and widely used method that partitions data into k clusters in which each record belongs to the cluster with the nearest mean, serving as a prototype of the cluster. The most common algorithm for k-means clustering is Lloyd’s algorithm that is an iterative method. Each iteration will be a MapReduce job, of which the map tasks find the nearest center for every sample and the reduce tasks recalculate the new centers/means of clusters. This process is repeated until the algorithm converges or reaches the maximum number of iterations. Clearly, MapReduce is not very efficient for iterative algorithms because the input data have to been read again and again for each iteration.

The above examples show that MapReduce is capable of a variety of tasks. On the other hand, they also demonstrate several drawbacks of MapReduce. First of all, MapReduce is slow due to design:

  • The high startup cost means that MapReduce is mainly suitable for long run batch jobs. Even though Hadoop now reuses JVM instances for map and reduce tasks, the startup time is still significant on large clusters.
  • The communication between map and reduce tasks always are done by remote file access, which actually often dominates the computation cost. Such a pulling strategy is great for fault tolerance, but it results in low performance compared to the push mechanism. Besides there could be M * R intermediate files. Given large M and R, it is certainly a challenge for underlying file system. With multiple reducers running simultaneously, it is highly likely that some of them will attempt to read from the same map node at the same time, inducing a large number of disk seeks and slowing the effective disk transfer rate.
  • MapReduce provides the automatic parallelization but it is independent of the underlying storage system. It’s application developers’ duty to organize data such as building and using any index, partitioning and collocating related data sets, etc. Unfortunately, these are not easy tasks in HDFS and MapReduce.
  • Iterative algorithms perform poorly on MapReduce because of reading input data again and again. Data also must be materialized and replicated on the distributed file system between successive jobs.

A major goal of MapReduce is to provide a simple programming model that application developers need only to write the map and reduce parts of the program. However, practical programmers have to take care of a lot things such as input/output format, partition functions, comparison functions, combiners, and job configuration to achieve good performance. As shown in the example, even a very simple grep MapReduce program is fairly long. On the other hand, the same query in SQL is much shorter and cleaner.

The simple computing model of MapReduce brings us no explicit handling of data transfer and synchronization in programs, and no deadlock. But it is a limited parallel computing model. For non-trivial algorithms, programmers try hard to “MapReducize” them, often in a non-intuitive way.

After years of practice, the community has realized these problems and try to address them in different ways. For example, Apache Spark aims on the speed by keeping data in memory. Apache Pig provides a DSL and Hive provides a SQL dialect on the top of MapReduce to ease the programming. Google Dremel/Cloudera Impala target on interactive analysis with SQL queries. Microsoft Dryad/Apache Tez provides a more general parallel computing framework that models computations in DAGs. Google Pregel/Apache Giraph concerns computing problems on large graphs. Apache Storm focuses on real time event processing. We will look into all of them in this series.