, ,

For in-depth information on various Big Data technologies, check out my free e-book “Introduction to Big Data“.

We have reviewed Apache Hive and Cloudera Impala, which are great for ad hoc analysis of big data. Today, Facebook’s Hive data warehouse holds 300 PB data with an incoming daily rate of about 600 TB! It is amazing but it does’t mean that most analytics is on that scale (even for Facebook). In fact, queries usually focus on a particular subset or time window and touch only a small number of columns of tables.

In the paper “Nobody ever got fired for using Hadoop on a cluster“, Microsoft Research analyzed 174,000 jobs submitted to a production analytics cluster in Microsoft in a single month in 2011 and found that the median job input data set size was less than 14 GB. They also estimated that the median input data size of the Hadoop jobs on the production clusters at Yahoo is less than 12.5 GB. A 2012 paper from Facebook revealed that Facebook jobs follow a power-law distribution with small jobs dominating. From the graphs in the paper, it appears that at least 90% of the jobs have input sizes under 100 GB. For sure, the input sizes of today’s jobs are bigger. But many of them should be comfortably fit into the main memory of a cluster. Therefore, in-memory computation does make a lot of sense for interactive analytics at this scale. As Spark gains popularity, there are several efforts to build SQL on top of it.

The first attempt was Shark, started three years ago. Shark built on the Hive codebase. Shark uses the Hive query compiler to parse a HiveQL query and generate an abstract syntax tree, which is then turned into a logical plan with some basic optimizations. Then Shark applies additional optimizations and creates a physical plan of RDD operations, then executes them over Spark. It sounds straightforward but a naive implementation may be inefficient. The Shark team does an excellent job to ensure the high performance. First of all, Shark implements a columnar memory store on top of Spark’s native memory store to reduce the big memory overhead of JVM. Shark stores all columns of primitive types as JVM primitive arrays. Complex data types such as map and array are serialized and concatenated into a byte array. Since each column creates only one JVM object, it helps a lot to reduce the effect of garbage collection. Shark also try to maximize the throughput of distributed data loading. Each data loading task decides whether each column in a partition should be compressed, and chooses the best compression scheme for each partition rather than conforming to a global scheme. Moreover, Shark implemented a cost-based query optimizer that selects more efficient join order based on table and column statistics. The statistics may be manually calculated by Hive (ANALYZE TABLE statement) and stored in metastore. Otherwise, Shark collects the statistics when creating and caching a RDD.

Recently, Shark team announced that they are ending the development of Shark and will focus their resources towards Spark SQL. Before diving into Spark SQL, we should notice that the Hive community proposed the Hive on Spark initiative that will add Spark as the third execution engine to Hive. Because the implementation may take significant time and resources, the project will take a phased approach.

Back to Spark SQL, which takes a different design from Shark. In Shark, Spark is used as the backend engine, which the users doesn’t need to know. But Spark SQL is developed as part of Spark. Like using JDBC in Java, Spark SQL allows users to mix SQL and imperative/functional programming. The core of Spark SQL is SchemaRDD, a new type of RDD that has an associated schema. Similar to a table in a traditional relational database, SchemaRDDs can be used in relational queries in addition to standard RDD functions. A SchemaRDD can be created from an existing RDD using the SQLContext.createSchemaRDD() function (or implicitly converting an RDD of of Scala case classes by importing a SQLContext). A SchemaRDD can also be created by loading data in from external sources, e.g. Parquet file, a JSON dataset, or Hive queries through HiveContext.

Similar to Shark, Spark SQL employs an in-memory columnar store. Different form Shark, Spark SQL doesn’t use any query optimizations of Hive. Hive’s query optimizer has a lot of complexity to address the limitations of MapReduce. But many of those don’t apply in Spark. So Spark SQL designs a new query optimizer framework Catalyst.

Although very young, Spark SQL shows the performance and quality inherited from Spark. You definitely should keep your eyes on it.