For in-depth information on various Big Data technologies, check out my free e-book “Introduction to Big Data“.

An inner join operation combines two data sets, A and B, to produce a third one containing all record pairs from A and B with matching attribute value. The sort-merge join algorithm and hash-join algorithm are two common alternatives to implement the join operation in a parallel data flow environment. In sort-merge join, both A and B are sorted by the join attribute and then compared in sorted order. The matching pairs are inserted into the output stream. The hash-join first prepares a hash table of the smaller data set with the join attribute as the hash key. Then we scan the larger dataset and find the relevant rows from the smaller dataset by searching the hash table.

There are several ways to implement join in MapReduce, e.g. reduce-side join and map-side join. The reduce-side join is a straightforward approach that takes advantage of that identical keys are sent to the same reducer. In the reduce-side join, the output key of Mapper has to be the join key so that they reach the same reducer. The Mapper also tags each dataset with an identity to differentiate them in the reducer. With secondary sorting on the dataset identity, we ensure the order of values sent to the reducer, which generates the matched pairs for each join key. Because two datasets are usually in different formats, we can use the class MultipleInputs to setup different InputFormat and Mapperfor each input path. The reduce-side join belongs to the sort-merge join family and scales very well for large datasets. However, it may be less efficient in the case of data skew where a dataset is significantly smaller than the other.

If one dataset is small enough to fit into the memory, we may use the memory-based map-side join. In this approach, the Mappers side-load the smaller dataset and build a hash table of it during the setup, and process the rows of the larger dataset one-by-one in the map function. To efficiently load the smaller dataset in every Mapper, we should use the DistributedCache. The DistributedCache is a facility to cache application-specific large, read-only files. An application specifies the files to be cached by Job.addCacheFile(URI). The MapReduce framework will copy the necessary files on to the slave node before any tasks for the job are executed on that node. This is much more efficient than that copying the files for each Mapper. Besides, we can declare the hash table as a static field so that the tasks running successively in a JVM will share the data using the task JVM reuse feature. Thus, we only need to load the data only once for each JVM.

The above map-side join is fast but only works when the smaller dataset fits in the memory. To avoid this pitfall, we can use the multi-phrase map-side join. First we run a MapReduce job on each dataset that uses the join attribute as the Mapper’s and Reducer’s output key and have the same number of reducers for all datasets. In this way, all datasets are sorted by the join attribute and have the same number of partitions. In second phrase, we use CompositeInputFormat as the input format. TheCompositeInputFormat performs joins over a set of data sources sorted and partitioned the same way, which is guaranteed by the first phrase. So the records are already merged before they reach the Mapper, which simplify outputs the joins to the stream.

 

Advertisements