Tags

,

For in-depth information on various Big Data technologies, check out my free e-book “Introduction to Big Data“.

MapReduce is a good tool for offline, ad-hoc analytics, which often involves multiple successive jobs. A single MapReduce job essentially performs a group-by aggregation in a massively parallel way. However, its programming model is very low level. Custom code has to be written for even simple operations like projection and filtering. It is even more tedious and verbose to implement common relational operators such as join. Several efforts have been devoted to simplify the development of MapReduce programs by providing high level DSLs that can be translated to native MapReduce code. Different from many other projects that bring SQL to Hadoop, Pig is special in that it provides a procedural (data flow) programming language Pig Latin as it was designed for experienced programmers. However, SQL programmers won’t have difficulties to understand Pig Latin programs because most statements just look like SQL clauses.

A Pig Latin program is a sequence of steps, each of which carries out a single data processing at fairly high level, e.g. loading, filtering, grouping, etc. The input data can be loaded from the file system or HBase by the operator LOAD:

grunt> persons = LOAD 'person.csv' USING PigStorage(',') AS (name: chararray, age:int, address: (street: chararray, city: chararray, state: chararray, zip: int));

where grunt> is the prompt of Pig console and PigStorage is a built-in deserializer for structured text files. Various deserializers are available. User defined functions (UDFs) can also be used to parse data in unsupported format. The AS clause defines a schema that assigns names to fields and declares types for fields. Although schemas are optional, programmer are encouraged to use them whenever possible. Note that such a “schema on read” is very different from the relational approach that requires rigid predefined schemas. Therefore, there is no need copying or reorganizing the data.

Pig has a rich data model. Primitive data types include int, long, float, double, chararray, bytearray, boolean, datetime, biginteger and bigdecimal. And complex data types include tuple, bag (a collection of tuples), and map (a set of key value pairs). Different from relational model, the fields of tuples can be any data types. Similarly, the map values can be any types (the map key is always type chararray). That is, nested data structures are supported.

Once the input data have been specified, there is a rich set of relational operators to transform them. The FOREACH…GENERATE operator, corresponding to the map tasks of MapReduce, produces a new bag by projection, applying functions, etc.

grunt> flatten_persons = FOREACH persons GENERATE name, age, FLATTEN(address);

where FLATTEN is a function to remove one level of nesting. With the operator DESCRIBE, we can see the schema difference between persons and flatten_persons:

grunt> DESCRIBE persons;
persons: {name: chararray,age: int,address: (street: chararray,city: chararray,state: chararray,zip: int)}
grunt> DESCRIBE flatten_persons;
flatten_persons: {name: chararray,age: int,address::street: chararray,address::city: chararray,address::state: chararray,address::zip: int}

Frequently, we want to filter the data based on some condition.

grunt> adults = FILTER flatten_persons BY age > 18;

Aggregations can be done by GROUP operator, which corresponds to the reduce tasks in MapReduce.

grunt> grouped_by_state = GROUP flatten_persons BY state;
grunt> DESCRIBE grouped_by_state;
grouped_by_state: {group: chararray,flatten_persons: {(name: chararray,age: int,address::street: chararray,address::city: chararray,address::state: chararray,address::zip: int)}}

The result of a GROUP operation is a relation that includes one tuple per group of two fields:

  • The first field is named “group” and is the same type as the group key.
  • The second field takes the name of the original relation and is type bag.

We can also cogroup two or more relations.

grunt> cogrouped_by_name = COGROUP persons BY name, flatten_persons BY name;
grunt> DESCRIBE cogrouped_by_name;
cogrouped_by_name: {group: chararray,persons: {(name: chararray,age: int,address: (street: chararray,city: chararray,state: chararray,zip: int))},flatten_persons: {(name: chararray,age: int,address::street: chararray,address::city: chararray,address::state: chararray,address::zip: int)}}

In fact, the GROUP and COGROUP operators are identical. Both operators work with one or more relations. For readability, GROUP is used in statements involving one relation while COGROUP is used when involving two or more relations.

A closely related but different operator is JOIN, which is a syntactic sugar of COGROUP followed by FLATTEN.

grunt> joined_by_name = JOIN persons BY name, flatten_persons BY name;
grunt> DESCRIBE joined_by_name;
joined_by_name: {persons::name: chararray,persons::age: int,persons::address: (street: chararray,city: chararray,state: chararray,zip: int),flatten_persons::name: chararray,flatten_persons::age: int,flatten_persons::address::street: chararray,flatten_persons::address::city: chararray,flatten_persons::address::state: chararray,flatten_persons::address::zip: int}

Overall, a Pig Latin program is like a handcrafted query execution plan. In contrast, a SQL based solution, e.g. Hive, relays on an execution planner to automatically translate SQL statements to an execution plan. Like SQL, Pig Latin has no control structures. But it is possible to embed Pig Latin statements and Pig commands in the Python, JavaScript and Groovy scripts.

When you run the above statements in the console of Pig, you will notice that they finish instantaneously. It is because Pig is lazy and there is no really computation happened. For example, LOAD doesn’t really read the data but just returns a handle to a bag/relation. Only when a STORE command is issued, Pig materialize the result of a Pig Latin expression sequence to the file system. Before a STORE command, Pig just builds a logical plan for every user defined bag. At the point of a STORE command, the logical plan is compiled into a physical plan (a directed acyclic graph of MapReduce jobs) and is executed.

It is possible to replace MapReduce with other execution engines in Pig. For example, there are efforts to run Pig on top of Spark. However, is it necessary? Spark already provides many relational operators and the host language Scala is very nice to write concise and expressive programs.

In summary, Pig Latin is a simple and easy to use DSL that makes MapReduce programming a lot easier. Meanwhile, Pig keeps the flexibility of MapReduce to process schemaless data in plain files. There is no need to do slow and complex ETL tasks before analysis, which makes Pig a great tool for quick ad-hoc analytics such as web log analysis. In the next post, we will discuss Hive, a major competitor of Pig, which first brought SQL to Hadoop.

Advertisements