Chapter 3. Understanding MapReduce
The previous two chapters have discussed the problems that Hadoop allows us to solve, and gave some hands-on experience of running example MapReduce jobs. With this foundation, we will now go a little deeper.
In this chapter we will be:
- Understanding how key/value pairs are the basis of Hadoop tasks
- Learning the various stages of a MapReduce job
- Examining the workings of the map, reduce, and optional combined stages in detail
- Looking at the Java API for Hadoop and use it to develop some simple MapReduce jobs
- Learning about Hadoop input and output
Key/value pairs
Since been talking about operations that process and provide the output in terms of key/value pairs without explaining why. It is time to address that.
What it mean
Firstly, we will clarify just what we mean by key/value pairs by highlighting similar concepts in the Java standard library. The java.util.Map
interface is the parent of commonly used classes such as HashMap
and (through some library backward reengineering) even the original Hashtable
.
For any Java Map
object, its contents are a set of mappings from a given key of a specified type to a related value of a potentially different type. A HashMap
object could, for example, contain mappings from a person's name (String
) to his or her birthday (Date
).
In the context of Hadoop, we are referring to data that also comprises keys that relate to associated values. This data is stored in such a way that the various values in the data set can be sorted and rearranged across a set of keys. If we are using key/value data, it will make sense to ask questions such as the following:
- Does a given key have a mapping in the data set?
- What are the values associated with a given key?
- What is the complete set of keys?
Think back to WordCount from the previous chapter. We will go into it in more detail shortly, but the output of the program is clearly a set of key/value relationships; for each word (the key), there is a count (the value) of its number of occurrences. Think about this simple example and some important features of key/value data will become apparent, as follows:
- Keys must be unique but values need not be
- Each value must be associated with a key, but a key could have no values (though not in this particular example)
- Careful definition of the key is important; deciding on whether or not the counts are applied with case sensitivity will give different results
Note
Note that we need to define carefully what we mean by keys being unique here. This does not mean the key occurs only once; in our data set we may see a key occur numerous times and, as we shall see, the MapReduce model has a stage where all values associated with each key are collected together. The uniqueness of keys guarantees that if we collect together every value seen for any given key, the result will be an association from a single instance of the key to every value mapped in such a way, and none will be omitted.
Why key/value data?
Using key/value data as the foundation of MapReduce operations allows for a powerful programming model that is surprisingly widely applicable, as can be seen by the adoption of Hadoop and MapReduce across a wide variety of industries and problem scenarios. Much data is either intrinsically key/value in nature or can be represented in such a way. It is a simple model with broad applicability and semantics straightforward enough that programs defined in terms of it can be applied by a framework like Hadoop.
Of course, the data model itself is not the only thing that makes Hadoop useful; its real power lies in how it uses the techniques of parallel execution, and divide and conquer discussed in But we need this framework to provide us with a way of expressing our problems that doesn't require us to be an expert in the execution mechanics; we want to express the transformations required on our data and then let the framework do the rest. MapReduce, with its key/value interface, provides such a level of abstraction, whereby the programmer only has to specify these transformations and Hadoop handles the complex process of applying this to arbitrarily large data sets.
Some real-world examples
To become less abstract, let's think of some real-world data that is key/value pair:
- An address book relates a name (key) to contact information (value)
- A bank account uses an account number (key) to associate with the account details (value)
- The index of a book relates a word (key) to the pages on which it occurs (value)
- On a computer filesystem, filenames (keys) allow access to any sort of data, such as text, images, and sound (values)
These examples are intentionally broad in scope, to help and encourage you to think that key/value data is not some very constrained model used only in high-end data mining but a very common model that is all around us.
We would not be having this discussion if this was not important to Hadoop. The bottom line is that if the data can be expressed as key/value pairs, it can be processed by MapReduce.
MapReduce as a series of key/value transformations
You may have come across MapReduce described in terms of key/value transformations, in particular the intimidating one looking like this:
{K1,V1} -> {K2, List<V2>} -> {K3,V3}
We are now in a position to understand what this means:
- The input to the
map
method of a MapReduce job is a series of key/value pairs that we'll callK1
andV1
. - The output of the
map
method (and hence input to thereduce
method) is a series of keys and an associated list of values that are calledK2
andV2
. Note that each mapper simply outputs a series of individual key/value outputs; these are combined into a key and list of values in theshuffle
method. - The final output of the MapReduce job is another series of key/value pairs, called
K3
andV3
.
These sets of key/value pairs don't have to be different; it would be quite possible to input, say, names and contact details and output the same, with perhaps some intermediary format used in collating the information. Keep this three-stage model in mind as we explore the Java API for MapReduce next. We will first walk through the main parts of the API you will need and then do a systematic examination of the execution of a MapReduce job.
Pop quiz – key/value pairs
Q1. The concept of key/value pairs is…
- Something created by and specific to Hadoop.
- A way of expressing relationships we often see but don't think of as such.
- An academic concept from computer science.
Q2. Are username/password combinations an example of key/value data?
- Yes, it's a clear case of one value being associated to the other.
- No, the password is more of an attribute of the username, there's no index-type relationship.
- We'd not usually think of them as such, but Hadoop could still process a series of username/password combinations as key/value pairs.
The Hadoop Java API for MapReduce
Hadoop underwent a major API change in its 0.20 release, which is the primary interface in the 1.0 version we use in this book. Though the prior API was certainly functional, the community felt it was unwieldy and unnecessarily complex in some regards.
The new API, sometimes generally referred to as context objects, for reasons we'll see later, is the future of Java's MapReduce development; and as such we will use it wherever possible in this book. Note that caveat: there are parts of the pre-0.20 MapReduce libraries that have not been ported to the new API, so we will use the old interfaces when we need to examine any of these.
The 0.20 MapReduce Java API
The 0.20 and above versions of MapReduce API have most of the key classes and interfaces either in the org.apache.hadoop.mapreduce
package or its subpackages.
In most cases, the implementation of a MapReduce job will provide job-specific subclasses of the Mapper
and Reducer
base classes found in this package.
Note
We'll stick to the commonly used K1
/ K2
/ K3
/ and so on terminology, though more recently the Hadoop API has, in places, used terms such as KEYIN
/VALUEIN
and KEYOUT
/VALUEOUT
instead. For now, we will stick with K1
/ K2
/ K3
as it helps understand the end-to-end data flow.
The Mapper class
This is a cut-down view of the base Mapper
class provided by Hadoop. For our own mapper implementations, we will subclass this base class and override the specified method as follows:
class Mapper<K1, V1, K2, V2> { void map(K1 key, V1 value Mapper.Context context) throws IOException, InterruptedException {..} }
Although the use of Java generics can make this look a little opaque at first, there is actually not that much going on. The class is defined in terms of the key/value input and output types, and then the map
method takes an input key/value pair in its parameters. The other parameter is an instance of the Context
class that provides various mechanisms to communicate with the Hadoop framework, one of which is to output the results of a map
or reduce
method.
Tip
Notice that the map
method only refers to a single instance of K1
and V1
key/value pairs. This is a critical aspect of the MapReduce paradigm in which you write classes that process single records and the framework is responsible for all the work required to turn an enormous data set into a stream of key/value pairs. You will never have to write map
or reduce
classes that try to deal with the full data set. Hadoop also provides mechanisms through its InputFormat
and OutputFormat
classes that provide implementations of common file formats and likewise remove the need of having to write file parsers for any but custom file types.
There are three additional methods that sometimes may be required to be overridden.
protected void setup( Mapper.Context context) throws IOException, Interrupted Exception
This method is called once before any key/value pairs are presented to the map
method. The default implementation does nothing.
protected void cleanup( Mapper.Context context) throws IOException, Interrupted Exception
This method is called once after all key/value pairs have been presented to the map
method. The default implementation does nothing.
protected void run( Mapper.Context context) throws IOException, Interrupted Exception
This method controls the overall flow of task processing within a JVM. The default implementation calls the setup
method once before repeatedly calling the map
method for each key/value pair in the split, and then finally calls the cleanup
method.
Tip
Downloading the example code
You can download the example code files for all Packt books you have purchased from your account at http://www.packtpub.com. If you purchased this book elsewhere, you can visit http://www.packtpub.com/support and register to have the files e-mailed directly to you.
The Reducer class
The Reducer
base class works very similarly to the Mapper
class, and usually requires only subclasses to override a single reduce
method. Here is the cut-down class definition:
public class Reducer<K2, V2, K3, V3> { void reduce(K1 key, Iterable<V2> values, Reducer.Context context) throws IOException, InterruptedException {..} }
Again, notice the class definition in terms of the broader data flow (the reduce
method accepts K2
/V2
as input and provides K3
/V3
as output) while the actual reduce method takes only a single key and its associated list of values. The Context
object is again the mechanism to output the result of the method.
This class also has the setup
, run
, and cleanup
methods with similar default implementations as with the Mapper
class that can optionally be overridden:
protected void setup( Reduce.Context context) throws IOException, InterruptedException
This method is called once before any key/lists of values are presented to the reduce
method. The default implementation does nothing.
protected void cleanup( Reducer.Context context) throws IOException, InterruptedException
This method is called once after all key/lists of values have been presented to the reduce
method. The default implementation does nothing.
protected void run( Reducer.Context context) throws IOException, InterruptedException
This method controls the overall flow of processing the task within JVM. The default implementation calls the setup
method before repeatedly calling the reduce
method for as many key/values provided to the Reducer
class, and then finally calls the cleanup
method.
The Driver class
Although our mapper and reducer implementations are all we need to perform the MapReduce job, there is one more piece of code required: the driver that communicates with the Hadoop framework and specifies the configuration elements needed to run a MapReduce job. This involves aspects such as telling Hadoop which Mapper
and Reducer
classes to use, where to find the input data and in what format, and where to place the output data and how to format it. There is an additional variety of other configuration options that can be set and which we will see throughout this book.
There is no default parent Driver class as a subclass; the driver logic usually exists in the main method of the class written to encapsulate a MapReduce job. Take a look at the following code snippet as an example driver. Don't worry about how each line works, though you should be able to work out generally what each is doing:
public class ExampleDriver { ... public static void main(String[] args) throws Exception { // Create a Configuration object that is used to set other options Configuration conf = new Configuration() ; // Create the object representing the job Job job = new Job(conf, "ExampleJob") ; // Set the name of the main class in the job jarfile job.setJarByClass(ExampleDriver.class) ; // Set the mapper class job.setMapperClass(ExampleMapper.class) ; // Set the reducer class job.setReducerClass(ExampleReducer.class) ; // Set the types for the final output key and value job.setOutputKeyClass(Text.class) ; job.setOutputValueClass(IntWritable.class) ; // Set input and output file paths FileInputFormat.addInputPath(job, new Path(args[0])) ; FileOutputFormat.setOutputPath(job, new Path(args[1])) // Execute the job and wait for it to complete System.exit(job.waitForCompletion(true) ? 0 : 1); } }}
Given our previous talk of jobs, it is not surprising that much of the setup involves operations on a Job
object. This includes setting the job name and specifying which classes are to be used for the mapper and reducer implementations.
Certain input/output configurations are set and, finally, the arguments passed to the main method are used to specify the input and output locations for the job. This is a very common model that you will see often.
There are a number of default values for configuration options, and we are implicitly using some of them in the preceding class. Most notably, we don't say anything about the file format of the input files or how the output files are to be written. These are defined through the InputFormat
and OutputFormat
classes mentioned earlier; we will explore them in detail later. The default input and output formats are text files that suit our WordCount example. There are multiple ways of expressing the format within text files in addition to particularly optimized binary formats.
A common model for less complex MapReduce jobs is to have the Mapper
and Reducer
classes as inner classes within the driver. This allows everything to be kept in a single file, which simplifies the code distribution.
Writing MapReduce programs
We have been using and talking about WordCount for quite some time now; let's actually write an implementation, compile, and run it, and then explore some modifications.
Time for action – setting up the classpath
To compile any Hadoop-related code, we will need to refer to the standard Hadoop-bundled classes.
Add the Hadoop-1.0.4.core.jar
file from the distribution to the Java classpath as follows:
$ export CLASSPATH=.:${HADOOP_HOME}/Hadoop-1.0.4.core.jar:${CLASSPATH}
What just happened?
This adds the Hadoop-1.0.4.core.jar
file explicitly to the classpath alongside the current directory and the previous contents of the CLASSPATH environment variable.
Once again, it would be good to put this in your shell startup file or a standalone file to be sourced.
Note
We will later need to also have many of the supplied third-party libraries that come with Hadoop on our classpath, and there is a shortcut to do this. For now, the explicit addition of the core JAR file will suffice.
Time for action – implementing WordCount
We have seen the use of the WordCount example program in Chapter 2, Getting Hadoop Up and Running. Now we will explore our own Java implementation by performing the following steps:
- Enter the following code into the
WordCount1.java
file:Import java.io.* ; import org.apache.hadoop.conf.Configuration ; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount1 { public static class WordCountMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { String[] words = value.toString().split(" ") ; for (String str: words) { word.set(str); context.write(word, one); } } } public static class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int total = 0; for (IntWritable val : values) { total++ ; } context.write(key, new IntWritable(total)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "word count"); job.setJarByClass(WordCount1.class); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
- Now compile it by executing the following command:
$ javac WordCount1.java
What just happened?
This is our first complete MapReduce job. Look at the structure and you should recognize the elements we have previously discussed: the overall Job
class with the driver configuration in its main method and the Mapper
and Reducer
implementations defined as inner classes.
We'll do a more detailed walkthrough of the mechanics of MapReduce in the next section, but for now let's look at the preceding code and think of how it realizes the key/value transformations we talked about earlier.
The input to the Mapper
class is arguably the hardest to understand, as the key is not actually used. The job specifies TextInputFormat as the format of the input data and, by default, this delivers to the mapper data where the key is the line number in the file and the value is the text of that line. In reality, you may never actually see a mapper that uses that line number key, but it is provided.
The mapper is executed once for each line of text in the input source and every time it takes the line and breaks it into words. It then uses the Context
object to output (more commonly known as emitting) each new key/value of the form <word, 1>
. These are our K2
/V2
values.
We said before that the input to the reducer is a key and a corresponding list of values, and there is some magic that happens between the map
and reduce
methods to collect together the values for each key that facilitates this, which we'll not describe right now. Hadoop executes the reducer once for each key and the preceding reducer implementation simply counts the numbers in the Iterable
object and gives output for each word in the form of <word, count>
. This is our K3
/V3
values.
Take a look at the signatures of our mapper
and reducer
classes: the WordCountMapper
class gives IntWritable
and Text
as input and gives Text
and IntWritable
as output. The WordCountReducer
class gives Text
and IntWritable
both as input and output. This is again quite a common pattern, where the map
method performs an inversion on the key and values, and instead emits a series of data pairs on which the reducer performs aggregation.
The driver is more meaningful here, as we have real values for the parameters. We use arguments passed to the class to specify the input and output locations.
Time for action – building a JAR file
Before we run our job in Hadoop, we must collect the required class files into a single JAR file that we will submit to the system.
Create a JAR file from the generated class files.
$ jar cvf wc1.jar WordCount1*class
What just happened?
We must always package our class files into a JAR file before submitting to Hadoop, be it local or on Elastic MapReduce.
Tip
Be careful with the JAR command and file paths. If you include in a JAR file class the files from a subdirectory, the class may not be stored with the path you expect. This is especially common when using a catch-all classes directory where all source data gets compiled. It may be useful to write a script to change into the directory, convert the required files into JAR files, and move the JAR files to the required location.
Time for action – running WordCount on a local Hadoop cluster
Now we have generated the class files and collected them into a JAR file, we can run the application by performing the following steps:
- Submit the new JAR file to Hadoop for execution.
$ hadoop jar wc1.jar WordCount1 test.txt output
- If successful, you should see the output being very similar to the one we obtained when we ran the Hadoop-provided sample WordCount in the previous chapter. Check the output file; it should be as follows:
$ Hadoop fs –cat output/part-r-00000 This 1 yes 1 a 1 is 2 test 1 this 1
What just happened?
This is the first time we have used the Hadoop JAR command with our own code. There are four arguments:
- The name of the JAR file.
- The name of the driver class within the JAR file.
- The location, on HDFS, of the input file (a relative reference to the
/user/Hadoop home
folder, in this case). - The desired location of the output folder (again, a relative path).
Tip
The name of the driver class is only required if a main class has not (as in this case) been specified within the JAR file manifest.
Time for action – running WordCount on EMR
We will now show you how to run this same JAR file on EMR. Remember, as always, that this costs money!
- Go to the AWS console at http://aws.amazon.com/console, sign in, and select S3.
- You'll need two buckets: one to hold the JAR file and another for the job output. You can use existing buckets or create new ones.
- Open the bucket where you will store the job file, click on Upload, and add the
wc1.jar
file created earlier. - Return to the main console home page, and then go to the EMR portion of the console by selecting Elastic MapReduce.
- Click on the Create a New Job Flow button and you'll see a familiar screen as shown in the following screenshot:
- Previously, we used a sample application; to run our code, we need to perform different steps. Firstly, select the Run your own application radio button.
- In the Select a Job Type combobox, select Custom JAR.
- Click on the Continue button and you'll see a new form, as shown in the following screenshot:
We now specify the arguments to the job. Within our uploaded JAR file, our code—particularly the driver class—specifies aspects such as the Mapper
and Reducer
classes.
What we need to provide is the path to the JAR file and the input and output paths for the job. In the JAR Location field, put the location where you uploaded the JAR file. If the JAR file is called wc1.jar
and you uploaded it into a bucket called mybucket
, the path would be mybucket/wc1.jar
.
In the JAR Arguments field, you need to enter the name of the main class and the input and output locations for the job. For files on S3, we can use URLs of the form s3://bucketname/objectname
. Click on Continue and the familiar screen to specify the virtual machines for the job flow appears, as shown in the following screenshot:
Now continue through the job flow setup and execution as we did in Chapter 2, Getting Hadoop Up and Running.
What just happened?
The important lesson here is that we can reuse the code written on and for a local Hadoop cluster in EMR. Also, besides these first few steps, the majority of the EMR console is the same regardless of the source of the job code to be executed.
Through the remainder of this chapter, we will not explicitly show code being executed on EMR and will instead focus more on the local cluster, because running a JAR file on EMR is very easy.
The pre-0.20 Java MapReduce API
Our preference in this book is for the 0.20 and above versions of MapReduce Java API, but we'll need to take a quick look at the older APIs for two reasons:
- Many online examples and other reference materials are written for the older APIs.
- Several areas within the MapReduce framework are not yet ported to the new API, and we will need to use the older APIs to explore them.
The older API's classes are found primarily in the org.apache.hadoop.mapred
package.
The new API classes use concrete Mapper
and Reducer
classes, while the older API had this responsibility split across abstract classes and interfaces.
An implementation of a Mapper
class will subclass the abstract MapReduceBase
class and implement the Mapper
interface, while a custom Reducer
class will subclass the same MapReduceBase
abstract class but implement the Reducer
interface.
We'll not explore MapReduceBase
in much detail as its functionality deals with job setup and configuration, which aren't really core to understanding the MapReduce
model. But the interfaces of pre-0.20 Mapper
and Reducer
are worth showing:
public interface Mapper<K1, V1, K2, V2> { void map( K1 key, V1 value, OutputCollector< K2, V2> output, Reporter reporter) throws IOException ; } public interface Reducer<K2, V2, K3, V3> { void reduce( K2 key, Iterator<V2> values, OutputCollector<K3, V3> output, Reporter reporter) throws IOException ; }
There are a few points to understand here:
- The generic parameters to the
OutputCollector
class show more explicitly how the result of the methods is presented as output. - The old API used the
OutputCollector
class for this purpose, and theReporter
class to write status and metrics information to the Hadoop framework. The 0.20 API combines these responsibilities in theContext
class. - The
Reducer
interface uses anIterator
object instead of anIterable
object; this was changed as the latter works with the Java for each syntax and makes for cleaner code. - Neither the
map
nor thereduce
method could throwInterruptedException
in the old API.
As you can see, the changes between the APIs alter how MapReduce programs are written but don't change the purpose or responsibilities of mappers or reducers. Don't feel obliged to become an expert in both APIs unless you need to; familiarity with either should allow you to follow the rest of this book.
Hadoop-provided mapper and reducer implementations
We don't always have to write our own Mapper
and Reducer
classes from scratch. Hadoop provides several common Mapper
and Reducer
implementations that can be used in our jobs. If we don't override any of the methods in the Mapper
and Reducer
classes in the new API, the default implementations are the identity Mapper
and Reducer
classes, which simply output the input unchanged.
Note that more such prewritten Mapper
and Reducer
implementations may be added over time, and currently the new API does not have as many as the older one.
The mappers are found at org.apache.hadoop.mapreduce.lib.mapper
, and include the following:
- InverseMapper: This outputs (value, key)
- TokenCounterMapper: This counts the number of discrete tokens in each line of input
The reducers are found at org.apache.hadoop.mapreduce.lib.reduce
, and currently include the following:
- IntSumReducer: This outputs the sum of the list of integer values per key
- LongSumReducer: This outputs the sum of the list of long values per key
Time for action – WordCount the easy way
Let's revisit WordCount, but this time use some of these predefined map
and reduce
implementations:
- Create a new
WordCountPredefined.java
file containing the following code:import org.apache.hadoop.conf.Configuration ; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.map.TokenCounterMapper ; import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer ; public class WordCountPredefined { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "word count1"); job.setJarByClass(WordCountPredefined.class); job.setMapperClass(TokenCounterMapper.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
- Now compile, create the JAR file, and run it as before.
- Don't forget to delete the output directory before running the job, if you want to use the same location. Use the
hadoop fs -rmr
output, for example.
What just happened?
Given the ubiquity of WordCount as an example in the MapReduce world, it's perhaps not entirely surprising that there are predefined Mapper
and Reducer
implementations that together realize the entire WordCount solution. The TokenCounterMapper
class simply breaks each input line into a series of (token, 1)
pairs and the IntSumReducer
class provides a final count by summing the number of values for each key.
There are two important things to appreciate here:
- Though WordCount was doubtless an inspiration for these implementations, they are in no way specific to it and can be widely applicable
- This model of having reusable mapper and reducer implementations is one thing to remember, especially in combination with the fact that often the best starting point for a new MapReduce job implementation is an existing one
Walking through a run of WordCount
To explore the relationship between mapper and reducer in more detail, and to expose some of Hadoop's inner working, we'll now go through just how WordCount (or indeed any MapReduce job) is executed.
Startup
The call to Job.waitForCompletion()
in the driver is where all the action starts. The driver is the only piece of code that runs on our local machine, and this call starts the communication with the JobTracker. Remember that the JobTracker is responsible for all aspects of job scheduling and execution, so it becomes our primary interface when performing any task related to job management. The JobTracker communicates with the NameNode on our behalf and manages all interactions relating to the data stored on HDFS.
Splitting the input
The first of these interactions happens when the JobTracker looks at the input data and determines how to assign it to map tasks. Recall that HDFS files are usually split into blocks of at least 64 MB and the JobTracker will assign each block to one map task.
Our WordCount example, of course, used a trivial amount of data that was well within a single block. Picture a much larger input file measured in terabytes, and the split model makes more sense. Each segment of the file—or split, in MapReduce terminology—is processed uniquely by one map task.
Once it has computed the splits, the JobTracker places them and the JAR file containing the Mapper
and Reducer
classes into a job-specific directory on HDFS, whose path will be passed to each task as it starts.
Task assignment
Once the JobTracker has determined how many map tasks will be needed, it looks at the number of hosts in the cluster, how many TaskTrackers are working, and how many map tasks each can concurrently execute (a user-definable configuration variable). The JobTracker also looks to see where the various input data blocks are located across the cluster and attempts to define an execution plan that maximizes the cases when a TaskTracker processes a split/block located on the same physical host, or, failing that, it processes at least one in the same hardware rack.
This data locality optimization is a huge reason behind Hadoop's ability to efficiently process such large datasets. Recall also that, by default, each block is replicated across three different hosts, so the likelihood of producing a task/host plan that sees most blocks processed locally is higher than it may seem at first.
Task startup
Each TaskTracker then starts up a separate Java virtual machine to execute the tasks. This does add a startup time penalty, but it isolates the TaskTracker from problems caused by misbehaving map or reduce tasks, and it can be configured to be shared between subsequently executed tasks.
If the cluster has enough capacity to execute all the map tasks at once, they will all be started and given a reference to the split they are to process and the job JAR file. Each TaskTracker then copies the split to the local filesystem.
If there are more tasks than the cluster capacity, the JobTracker will keep a queue of pending tasks and assign them to nodes as they complete their initially assigned map tasks.
We are now ready to see the executed data of map tasks. If this all sounds like a lot of work, it is; and it explains why when running any MapReduce job, there is always a non-trivial amount of time taken as the system gets started and performs all these steps.
Ongoing JobTracker monitoring
The JobTracker doesn't just stop work now and wait for the TaskTrackers to execute all the mappers and reducers. It is constantly exchanging heartbeat and status messages with the TaskTrackers, looking for evidence of progress or problems. It also collects metrics from the tasks throughout the job execution, some provided by Hadoop and others specified by the developer of the map and reduce tasks, though we don't use any in this example.
Mapper input
In text file. For the rest of this walkthrough, let's assume it was a not-much-less trivial two-line text file:
This is a test Yes this is
The driver class specifies the format and structure of the input file by using TextInputFormat, and from this Hadoop knows to treat this as text with the line number as the key and line contents as the value. The two invocations of the mapper will therefore be given the following input:
1 This is a test 2 Yes it is.
Mapper execution
The key/value pairs received by the mapper are the offset in the file of the line and the line contents respectively because of how the job is configured. Our implementation of the map
method in WordCountMapper
discards the key as we do not care where each line occurred in the file and splits the provided value into words using the split
method on the standard Java String class. Note that better tokenization could be provided by use of regular expressions or the StringTokenizer
class, but for our purposes this simple approach will suffice.
For each individual word, the mapper then emits a key comprised of the actual word itself, and a value of 1.
Tip
We add a few optimizations that we'll mention here, but don't worry too much about them at this point. You will see that we don't create the IntWritable
object containing the value 1 each time, instead we create it as a static variable and re-use it in each invocation. Similarly, we use a single Text
object and reset its contents for each execution of the method. The reason for this is that though it doesn't help much for our tiny input file, the processing of a huge data set would see the mapper potentially called thousands or millions of times. If each invocation potentially created a new object for both the key and value output, this would become a resource issue and likely cause much more frequent pauses due to garbage collection. We use this single value and know the Context.write
method will not alter it.
Mapper output and reduce input
The output of the mapper is a series of pairs of the form (word, 1)
; in our example these will be:
(This,1), (is, 1), (a, 1), (test., 1), (Yes, 1), (it, 1), (is, 1)
These output pairs from the mapper are not passed directly to the reducer. Between mapping and reducing is the shuffle stage where much of the magic of MapReduce occurs.
Partitioning
One of the implicit guarantees of the Reduce
interface is that a single reducer will be given all the values associated with a given key. With multiple reduce tasks running across a cluster, each mapper output must therefore be partitioned into the separate outputs destined for each reducer. These partitioned files are stored on the local node filesystem.
The number of reduce tasks across the cluster is not as dynamic as that of mappers, and indeed we can specify the value as part of our job submission. Each TaskTracker therefore knows how many reducers are in the cluster and from this how many partitions the mapper output should be split into.
Note
We'll address failure tolerance in a later chapter, but at this point an obvious question is what happens to this calculation if a reducer fails. The answer is that the JobTracker will ensure that any failed reduce tasks are reexecuted, potentially on a different node so a transient failure will not be an issue. A more serious issue, such as that caused by a data-sensitive bug or very corrupt data in a split will, unless certain steps are taken, cause the whole job to fail.
The optional partition function
Within the org.apache.hadoop.mapreduce
package is the Partitioner
class, an abstract class with the following signature:
public abstract class Partitioner<Key, Value> { public abstract int getPartition( Key key, Value value, int numPartitions) ; }
By default, Hadoop will use a strategy that hashes the output key to perform the partitioning. This functionality is provided by the HashPartitioner
class within the org.apache.hadoop.mapreduce.lib.partition
package, but it is necessary in some cases to provide a custom subclass of Partitioner
with application-specific partitioning logic. This would be particularly true if, for example, the data provided a very uneven distribution when the standard hash function was applied.
Reducer input
The reducer TaskTracker receives updates from the JobTracker that tell it which nodes in the cluster hold map output partitions which need to be processed by its local reduce task. It then retrieves these from the various nodes and merges them into a single file that will be fed to the reduce task.
Reducer execution
Our WordCountReducer
class is very simple; for each word it simply counts the number of elements in the array and emits the final (Word, count)
output for each word.
Tip
We don't worry about any sort of optimization to avoid excess object creation here. The number of reduce invocations is typically smaller than the number of mappers, and consequently the overhead is less of a concern. However, feel free to do so if you find yourself with very tight performance requirements.
For our invocation of WordCount on our sample input, all but one word have only one value in the list of values; is
has two.
Note
Note that the word this
and This
had discrete counts because we did not attempt to ignore case sensitivity. Similarly, ending each sentence with a period would have stopped is
having a count of two as is
would be different from is.
. Always be careful when working with textual data such as capitalization, punctuation, hyphenation, pagination, and other aspects, as they can skew how the data is perceived. In such cases, it's common to have a precursor MapReduce job that applies a normalization or clean-up strategy to the data set.
Reducer output
The final set of reducer output for our example is therefore:
(This, 1), (is, 2), (a, 1), (test, 1), (Yes, 1), (this, 1)
This data will be output to partition files within the output directory specified in the driver that will be formatted using the specified OutputFormat implementation. Each reduce task writes to a single file with the filename part-r-nnnnn
, where nnnnn
starts at 00000
and is incremented. This is, of course, what we saw in Chapter 2, Getting Hadoop Up and Running; hopefully the part
prefix now makes a little more sense.
Shutdown
Once all tasks have completed successfully, the JobTracker outputs the final state of the job to the client, along with the final aggregates of some of the more important counters that it has been aggregating along the way. The full job and task history is available in the log directory on each node or, more accessibly, via the JobTracker web UI; point your browser to port 50030 on the JobTracker node.
That's all there is to it!
As you've seen, each MapReduce program sits atop a significant amount of machinery provided by Hadoop and the sketch provided is in many ways a simplification. As before, much of this isn't hugely valuable for such a small example, but never forget that we can use the same software and mapper/reducer implementations to do a WordCount on a much larger data set across a huge cluster, be it local or on EMR. The work that Hadoop does for you at that point is enormous and is what makes it possible to perform data analysis on such datasets; otherwise, the effort to manually implement the distribution, synchronization, and parallelization of code will be immense.
Apart from the combiner…maybe
There is one additional, and optional, step that we omitted previously. Hadoop allows the use of a combiner class to perform some early sorting of the output from the map
method before it is retrieved by the reducer.
Why have a combiner?
Much of Hadoop's design is predicated on reducing the expensive parts of a job that usually equate to disk and network I/O. The output of the mapper is often large; it's not infrequent to see it many times the size of the original input. Hadoop does allow configuration options to help reduce the impact of the reducers transferring such large chunks of data across the network. The combiner takes a different approach, where it is possible to perform early aggregation to require less data to be transferred in the first place.
The combiner does not have its own interface; a combiner must have the same signature as the reducer and hence also subclasses the Reduce
class from the org.apache.hadoop.mapreduce
package. The effect of this is to basically perform a mini-reduce on the mapper for the output destined for each reducer.
Hadoop does not guarantee whether the combiner will be executed. At times, it may not be executed at all, while at times it may be used once, twice, or more times depending on the size and number of output files generated by the mapper for each reducer.
Time for action – WordCount with a combiner
Let's add a combiner to our first WordCount example. In fact, let's use our reducer as the combiner. Since the combiner must have the same interface as the reducer, this is something you'll often see, though note that the type of processing involved in the reducer will determine if it is a true candidate for a combiner; we'll discuss this later. Since we are looking to count word occurrences, we can do a partial count on the map node and pass these subtotals to the reducer.
- Copy
WordCount1.java
toWordCount2.java
and change the driver class to add the following line between the definition of theMapper
andReducer
classes:job.setCombinerClass(WordCountReducer.class);
- Also change the class name to
WordCount2
and then compile it.$ javac WordCount2.java
- Create the JAR file.
$ jar cvf wc2.jar WordCount2*class
- Run the job on Hadoop.
$ hadoop jar wc2.jar WordCount2 test.txt output
- Examine the output.
$ hadoop fs -cat output/part-r-00000
What just happened?
This output may not be what you expected, as the value for the word is
is now incorrectly specified as 1 instead of 2.
The problem lies in how the combiner and reducer will interact. The value provided to the reducer, which was previously (is, 1, 1)
, is now (is, 2)
because our combiner did its own summation of the number of elements for each word. However, our reducer does not look at the actual values in the Iterable
object, it simply counts how many are there.
When you can use the reducer as the combiner
You need to be careful when writing a combiner. Remember that Hadoop makes no guarantees on how many times it may be applied to map output, it may be 0, 1, or more. It is therefore critical that the operation performed by the combiner can effectively be applied in such a way. Distributive operations such as summation, addition, and similar are usually safe, but, as shown previously, ensure the reduce logic isn't making implicit assumptions that might break this property.
Time for action – fixing WordCount to work with a combiner
Let's make the necessary modifications to WordCount to correctly use a combiner.
Copy WordCount2.java
to a new file called WordCount3.java
and change the reduce
method as follows:
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int total = 0 ; for (IntWritable val : values)) { total+= val.get() ; } context.write(key, new IntWritable(total)); }
Remember to also change the class name to WordCount3
and then compile, create the JAR file, and run the job as before.
What just happened?
The output is now as expected. Any map-side invocations of the combiner performs successfully and the reducer correctly produces the overall output value.
Tip
Would this have worked if the original reducer was used as the combiner and the new reduce implementation as the reducer? The answer is no, though our test example would not have demonstrated it. Because the combiner may be invoked multiple times on the map output data, the same errors would arise in the map output if the dataset was large enough, but didn't occur here due to the small input size. Fundamentally, the original reducer was incorrect, but this wasn't immediately obvious; watch out for such subtle logic flaws. This sort of issue can be really hard to debug as the code will reliably work on a development box with a subset of the data set and fail on the much larger operational cluster. Carefully craft your combiner classes and never rely on testing that only processes a small sample of the data.
Reuse is your friend
In the previous section we took the existing job class file and made changes to it. This is a small example of a very common Hadoop development workflow; use an existing job file as the starting point for a new one. Even if the actual mapper and reducer logic is very different, it's often a timesaver to take an existing working job as this helps you remember all the required elements of the mapper, reducer, and driver implementations.
Pop quiz – MapReduce mechanics
Q1. What do you always have to specify for a MapReduce job?
- The classes for the mapper and reducer.
- The classes for the mapper, reducer, and combiner.
- The classes for the mapper, reducer, partitioner, and combiner.
- None; all classes have default implementations.
Q2. How many times will a combiner be executed?
- At least once.
- Zero or one times.
- Zero, one, or many times.
- It's configurable.
Q3. You have a mapper that for each key produces an integer value and the following set of reduce operations:
- Reducer A: outputs the sum of the set of integer values.
- Reducer B: outputs the maximum of the set of values.
- Reducer C: outputs the mean of the set of values.
- Reducer D: outputs the difference between the largest and smallest values in the set.
Which of these reduce operations could safely be used as a combiner?
- All of them.
- A and B.
- A, B, and D.
- C and D.
- None of them.
Hadoop-specific data types
Up to this point we've glossed over the actual data types used as the input and output of the map and reduce classes. Let's take a look at them now.
The Writable and WritableComparable interfaces
If you browse the Hadoop API for the org.apache.hadoop.io
package, you'll see some familiar classes such as Text
and IntWritable
along with others with the Writable
suffix.
This package also contains the Writable
interface specified as follows:
import java.io.DataInput ; import java.io.DataOutput ; import java.io.IOException ; public interface Writable { void write(DataOutput out) throws IOException ; void readFields(DataInput in) throws IOException ; }
The main purpose of this interface is to provide mechanisms for the serialization and deserialization of data as it is passed across the network or read and written from the disk. Every data type to be used as a value input or output from a mapper or reducer (that is, V1
, V2
, or V3
) must implement this interface.
Data to be used as keys (K1
, K2
, K3
) has a stricter requirement: in addition to Writable
,it must also provide an implementation of the standard Java Comparable
interface. This has the following specifications:
public interface Comparable { public int compareTO( Object obj) ; }
The compare method returns -1
, 0
, or 1
depending on whether the compared object is less than, equal to, or greater than the current object.
As a convenience interface, Hadoop provides the WritableComparable
interface in the org.apache.hadoop.io
package.
public interface WritableComparable extends Writable, Comparable {}
Introducing the wrapper classes
Fortunately, you don't have to start from scratch; as you've already seen, Hadoop provides classes that wrap the Java primitive types and implement WritableComparable
. They are provided in the org.apache.hadoop.io
package.
Primitive wrapper classes
These classes are conceptually similar to the primitive wrapper classes, such as Integer
and Long
found in java.lang
. They hold a single primitive value that can be set either at construction or via a setter method.
- BooleanWritable
- ByteWritable
- DoubleWritable
- FloatWritable
- IntWritable
- LongWritable
- VIntWritable – a variable length integer type
- VLongWritable – a variable length long type
Array wrapper classes
These classes provide writable wrappers for arrays of other Writable
objects. For example, an instance of either could hold an array of IntWritable
or DoubleWritable
but not arrays of the raw int or float types. A specific subclass for the required Writable
class will be required. They are as follows:
- ArrayWritable
- TwoDArrayWritable
Map wrapper classes
These classes allow implementations of the java.util.Map
interface to be used as keys or values. Note that they are defined as Map<Writable, Writable>
and effectively manage a degree of internal-runtime-type checking. This does mean that compile type checking is weakened, so be careful.
AbstractMapWritable
: This is a base class for other concreteWritable
map implementationsMapWritable
: This is a general purpose map mappingWritable
keys toWritable
valuesSortedMapWritable
: This is a specialization of theMapWritable
class that also implements theSortedMap
interface
Time for action – using the Writable wrapper classes
Let's write a class to show some of these wrapper classes in action:
- Create the following as
WritablesTest.java
:import org.apache.hadoop.io.* ; import java.util.* ; public class WritablesTest { public static class IntArrayWritable extends ArrayWritable { public IntArrayWritable() { super(IntWritable.class) ; } } public static void main(String[] args) { System.out.println("*** Primitive Writables ***") ; BooleanWritable bool1 = new BooleanWritable(true) ; ByteWritable byte1 = new ByteWritable( (byte)3) ; System.out.printf("Boolean:%s Byte:%d\n", bool1, byte1.get()) ; IntWritable i1 = new IntWritable(5) ; IntWritable i2 = new IntWritable( 17) ; System.out.printf("I1:%d I2:%d\n", i1.get(), i2.get()) ; i1.set(i2.get()) ; System.out.printf("I1:%d I2:%d\n", i1.get(), i2.get()) ; Integer i3 = new Integer( 23) ; i1.set( i3) ; System.out.printf("I1:%d I2:%d\n", i1.get(), i2.get()) ; System.out.println("*** Array Writables ***") ; ArrayWritable a = new ArrayWritable( IntWritable.class) ; a.set( new IntWritable[]{ new IntWritable(1), new IntWritable(3), new IntWritable(5)}) ; IntWritable[] values = (IntWritable[])a.get() ; for (IntWritable i: values) System.out.println(i) ; IntArrayWritable ia = new IntArrayWritable() ; ia.set( new IntWritable[]{ new IntWritable(1), new IntWritable(3), new IntWritable(5)}) ; IntWritable[] ivalues = (IntWritable[])ia.get() ; ia.set(new LongWritable[]{new LongWritable(1000l)}) ; System.out.println("*** Map Writables ***") ; MapWritable m = new MapWritable() ; IntWritable key1 = new IntWritable(5) ; NullWritable value1 = NullWritable.get() ; m.put(key1, value1) ; System.out.println(m.containsKey(key1)) ; System.out.println(m.get(key1)) ; m.put(new LongWritable(1000000000), key1) ; Set<Writable> keys = m.keySet() ; for(Writable w: keys) System.out.println(w.getClass()) ; } }
- Compile and run the class, and you should get the following output:
*** Primitive Writables *** Boolean:true Byte:3 I1:5 I2:17 I1:17 I2:17 I1:23 I2:17 *** Array Writables *** 1 3 5 *** Map Writables *** true (null) class org.apache.hadoop.io.LongWritable class org.apache.hadoop.io.IntWritable
What just happened?
This output should be largely self-explanatory. We create various Writable
wrapper objects and show their general usage. There are several key points:
- As mentioned, there is no type-safety beyond
Writable
itself. So it is possible to have an array or map that holds multiple types, as shown previously. - We can use autounboxing, for example, by supplying an
Integer
object to methods onIntWritable
that expect anint
variable. - The inner class demonstrates what is needed if an
ArrayWritable
class is to be used as an input to areduce
function; a subclass with such a default constructor must be defined.
Other wrapper classes
CompressedWritable
: This is a base class to allow for large objects that should remain compressed until their attributes are explicitly accessedObjectWritable
: This is a general-purpose generic object wrapperNullWritable
: This is a singleton object representation of a null valueVersionedWritable
: This is a base implementation to allow writable classes to track versions over time
Have a go hero – playing with Writables
Write a class that exercises the NullWritable
and ObjectWritable
classes in the same way as it does in the previous examples.
Making your own
As you have seen from the Writable
and Comparable
interfaces, the required methods are pretty straightforward; don't be afraid of adding this functionality if you want to use your own custom classes as keys or values within a MapReduce job.
Input/output
There is one aspect of our driver classes that we have mentioned several times without getting into a detailed explanation: the format and structure of the data input into and output from MapReduce jobs.
Files, splits, and records
We have talked about files being broken into splits as part of the job startup and the data in a split being sent to the mapper implementation. However, this overlooks two aspects: how the data is stored in the file and how the individual keys and values are passed to the mapper structure.
InputFormat and RecordReader
Hadoop has the concept of an InputFormat for the first of these responsibilities. The InputFormat
abstract class in the org.apache.hadoop.mapreduce
package provides two methods as shown in the following code:
public abstract class InputFormat<K, V> { public abstract List<InputSplit> getSplits( JobContext context) ; RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) ; }
These methods display the two responsibilities of the InputFormat
class:
- To provide the details on how to split an input file into the splits required for map processing
- To create a
RecordReader
class that will generate the series of key/value pairs from a split
The RecordReader
class is also an abstract class within the org.apache.hadoop.mapreduce
package:
public abstract class RecordReader<Key, Value> implements Closeable { public abstract void initialize(InputSplit split, TaskAttemptContext context) ; public abstract boolean nextKeyValue() throws IOException, InterruptedException ; public abstract Key getCurrentKey() throws IOException, InterruptedException ; public abstract Value getCurrentValue() throws IOException, InterruptedException ; public abstract float getProgress() throws IOException, InterruptedException ; public abstract close() throws IOException ; }
A RecordReader
instance is created for each split and calls getNextKeyValue
to return a Boolean indicating if another key/value pair is available and if so, the getKey
and getValue
methods are used to access the key and value respectively.
The combination of the InputFormat
and RecordReader
classes therefore are all that is required to bridge between any kind of input data and the key/value pairs required by MapReduce.
Hadoop-provided InputFormat
There are some Hadoop-provided InputFormat
implementations within the org.apache.hadoop.mapreduce.lib.input
package:
FileInputFormat
: This is an abstract base class that can be the parent of any file-based inputSequenceFileInputFormat
: This is an efficient binary file format that will be discussed in an upcoming sectionTextInputFormat
: This is used for plain text files
Tip
The pre-0.20 API has additional InputFormats defined in the org.apache.hadoop.mapred
package.
Note that InputFormats
are not restricted to reading from files; FileInputFormat
is itself a subclass of InputFormat
. It is possible to have Hadoop use data that is not based on the files as the input to MapReduce jobs; common sources are relational databases or HBase.
Hadoop-provided RecordReader
Similarly, Hadoop provides a few common RecordReader
implementations, which are also present within the org.apache.hadoop.mapreduce.lib.input
package:
LineRecordReader
: This implementation is the defaultRecordReader
class for text files that present the line number as the key and the line contents as the valueSequenceFileRecordReader
: This implementation reads the key/value from the binarySequenceFile
container
Again, the pre-0.20 API has additional RecordReader
classes in the org.apache.hadoop.mapred
package, such as KeyValueRecordReader
, that have not yet been ported to the new API.
OutputFormat and RecordWriter
There is a similar pattern for writing the output of a job coordinated by subclasses of OutputFormat
and RecordWriter
from the org.apache.hadoop.mapreduce
package. We'll not explore these in any detail here, but the general approach is similar, though OutputFormat
does have a more involved API as it has methods for tasks such as validation of the output specification.
Tip
It is this step that causes a job to fail if a specified output directory already exists. If you wanted different behavior, it would require a subclass of OutputFormat
that overrides this method.
Hadoop-provided OutputFormat
The following OutputFormats are provided in the org.apache.hadoop.mapreduce.output
package:
FileOutputFormat
: This is the base class for all file-based OutputFormatsNullOutputFormat
: This is a dummy implementation that discards the output and writes nothing to the fileSequenceFileOutputFormat
: This writes to the binarySequenceFile
formatTextOutputFormat
: This writes a plain text file
Note that these classes define their required RecordWriter
implementations as inner classes so there are no separately provided RecordWriter
implementations.
Don't forget Sequence files
The SequenceFile
class within the org.apache.hadoop.io
package provides an efficient binary file format that is often useful as an output from a MapReduce job. This is especially true if the output from the job is processed as the input of another job. The Sequence
files have several advantages, as follows:
- As binary files, they are intrinsically more compact than text files
- They additionally support optional compression, which can also be applied at different levels, that is, compress each record or an entire split
- The file can be split and processed in parallel
This last characteristic is important, as most binary formats—particularly those that are compressed or encrypted—cannot be split and must be read as a single linear stream of data. Using such files as input to a MapReduce job means that a single mapper will be used to process the entire file, causing a potentially large performance hit. In such a situation, it is preferable to either use a splitable format such as SequenceFile
, or, if you cannot avoid receiving the file in the other format, do a preprocessing step that converts it into a splitable format. This will be a trade-off, as the conversion will take time; but in many cases—especially with complex map tasks—this will be outweighed by the time saved.
Summary
We have covered a lot of ground in this chapter and we now have the foundation to explore MapReduce in more detail. Specifically, we learned how key/value pairs is a broadly applicable data model that is well suited to MapReduce processing. We also learned how to write mapper and reducer implementations using the 0.20 and above versions of the Java API.
We then moved on and saw how a MapReduce job is processed and how the map
and reduce
methods are tied together by significant coordination and task-scheduling machinery. We also saw how certain MapReduce jobs require specialization in the form of a custom partitioner or combiner.
We also learned how Hadoop reads data to and from the filesystem. It uses the concept of InputFormat
and OutputFormat
to handle the file as a whole and RecordReader
and RecordWriter
to translate the format to and from key/value pairs.
With this knowledge, we will now move on to a case study in the next chapter, which demonstrates the ongoing development and enhancement of a MapReduce application that processes a large data set.