Hadoop:Data Processing and Modelling
上QQ阅读APP看书,第一时间看更新

Chapter 2. Getting Hadoop Up and Running

Now that we have explored the opportunities and challenges presented by large-scale data processing and why Hadoop is a compelling choice, it's time to get things set up and running.

In this chapter, we will do the following:

  • Learn how to install and run Hadoop on a local Ubuntu host
  • Run some example Hadoop programs and get familiar with the system
  • Set up the accounts required to use Amazon Web Services products such as EMR
  • Create an on-demand Hadoop cluster on Elastic MapReduce
  • Explore the key differences between a local and hosted Hadoop cluster

Hadoop on a local Ubuntu host

For our exploration of Hadoop outside the cloud, we shall give examples using one or more Ubuntu hosts. A single machine (be it a physical computer or a virtual machine) will be sufficient to run all the parts of Hadoop and explore MapReduce. However, production clusters will most likely involve many more machines, so having even a development Hadoop cluster deployed on multiple hosts will be good experience. However, for getting started, a single host will suffice.

Nothing we discuss will be unique to Ubuntu, and Hadoop should run on any Linux distribution. Obviously, you may have to alter how the environment is configured if you use a distribution other than Ubuntu, but the differences should be slight.

Other operating systems

Hadoop does run well on other platforms. Windows and Mac OS X are popular choices for developers. Windows is supported only as a development platform and Mac OS X is not formally supported at all.

If you choose to use such a platform, the general situation will be similar to other Linux distributions; all aspects of how to work with Hadoop will be the same on both platforms but you will need use the operating system-specific mechanisms for setting up environment variables and similar tasks. The Hadoop FAQs contain some information on alternative platforms and should be your first port of call if you are considering such an approach. The Hadoop FAQs can be found at http://wiki.apache.org/hadoop/FAQ.

Time for action – checking the prerequisites

Hadoop is written in Java, so you will need a recent Java Development Kit (JDK) installed on the Ubuntu host. Perform the following steps to check the prerequisites:

  1. First, check what's already available by opening up a terminal and typing the following:
    $ javac
    $ java -version
    
  2. If either of these commands gives a no such file or directory or similar error, or if the latter mentions "Open JDK", it's likely you need to download the full JDK. Grab this from the Oracle download page at http://www.oracle.com/technetwork/java/javase/downloads/index.html; you should get the latest release.
  3. Once Java is installed, add the JDK/bin directory to your path and set the JAVA_HOME environment variable with commands such as the following, modified for your specific Java version:
    $ export JAVA_HOME=/opt/jdk1.6.0_24
    $ export PATH=$JAVA_HOME/bin:${PATH}
    

What just happened?

These steps ensure the right version of Java is installed and available from the command line without having to use lengthy pathnames to refer to the install location.

Remember that the preceding commands only affect the currently running shell and the settings will be lost after you log out, close the shell, or reboot. To ensure the same setup is always available, you can add these to the startup files for your shell of choice, within the .bash_profile file for the BASH shell or the .cshrc file for TCSH, for example.

An alternative favored by me is to put all required configuration settings into a standalone file and then explicitly call this from the command line; for example:

$ source Hadoop_config.sh

This technique allows you to keep multiple setup files in the same account without making the shell startup overly complex; not to mention, the required configurations for several applications may actually be incompatible. Just remember to begin by loading the file at the start of each session!

Setting up Hadoop

One of the most confusing aspects of Hadoop to a newcomer is its various components, projects, sub-projects, and their interrelationships. The fact that these have evolved over time hasn't made the task of understanding it all any easier. For now, though, go to http://hadoop.apache.org and you'll see that there are three prominent projects mentioned:

  • Common
  • HDFS
  • MapReduce

The last two of these should be familiar from the explanation in Chapter 1, What It's All About, and common projects comprise a set of libraries and tools that help the Hadoop product work in the real world. For now, the important thing is that the standard Hadoop distribution bundles the latest versions all of three of these projects and the combination is what you need to get going.

A note on versions

Hadoop underwent a major change in the transition from the 0.19 to the 0.20 versions, most notably with a migration to a set of new APIs used to develop MapReduce applications. We will be primarily using the new APIs in this book, though we do include a few examples of the older API in later chapters as not of all the existing features have been ported to the new API.

Hadoop versioning also became complicated when the 0.20 branch was renamed to 1.0. The 0.22 and 0.23 branches remained, and in fact included features not included in the 1.0 branch. At the time of this writing, things were becoming clearer with 1.1 and 2.0 branches being used for future development releases. As most existing systems and third-party tools are built against the 0.20 branch, we will use Hadoop 1.0 for the examples in this book.

Time for action – downloading Hadoop

Carry out the following steps to download Hadoop:

  1. Go to the Hadoop download page at http://hadoop.apache.org/common/releases.html and retrieve the latest stable version of the 1.0.x branch; at the time of this writing, it was 1.0.4.
  2. You'll be asked to select a local mirror; after that you need to download the file with a name such as hadoop-1.0.4-bin.tar.gz.
  3. Copy this file to the directory where you want Hadoop to be installed (for example, /usr/local), using the following command:
    $ cp Hadoop-1.0.4.bin.tar.gz /usr/local
    
  4. Decompress the file by using the following command:
    $ tar –xf hadoop-1.0.4-bin.tar.gz
    
  5. Add a convenient symlink to the Hadoop installation directory.
    $ ln -s /usr/local/hadoop-1.0.4 /opt/hadoop
    
  6. Now you need to add the Hadoop binary directory to your path and set the HADOOP_HOME environment variable, just as we did earlier with Java.
    $ export HADOOP_HOME=/usr/local/Hadoop
    $ export PATH=$HADOOP_HOME/bin:$PATH
    
  7. Go into the conf directory within the Hadoop installation and edit the Hadoop-env.sh file. Search for JAVA_HOME and uncomment the line, modifying the location to point to your JDK installation, as mentioned earlier.

What just happened?

These steps ensure that Hadoop is installed and available from the command line. By setting the path and configuration variables, we can use the Hadoop command-line tool. The modification to the Hadoop configuration file is the only required change to the setup needed to integrate with your host settings.

As mentioned earlier, you should put the export commands in your shell startup file or a standalone-configuration script that you specify at the start of the session.

Don't worry about some of the details here; we'll cover Hadoop setup and use later.

Time for action – setting up SSH

Carry out the following steps to set up SSH:

  1. Create a new OpenSSL key pair with the following commands:
    $ ssh-keygen
    Generating public/private rsa key pair.
    Enter file in which to save the key (/home/hadoop/.ssh/id_rsa): 
    Created directory '/home/hadoop/.ssh'.
    Enter passphrase (empty for no passphrase): 
    Enter same passphrase again: 
    Your identification has been saved in /home/hadoop/.ssh/id_rsa.
    Your public key has been saved in /home/hadoop/.ssh/id_rsa.pub.
    
    
  2. Copy the new public key to the list of authorized keys by using the following command:
    $ cp .ssh/id _rsa.pub .ssh/authorized_keys 
    
  3. Connect to the local host.
    $ ssh localhost
    The authenticity of host 'localhost (127.0.0.1)' can't be established.
    RSA key fingerprint is b6:0c:bd:57:32:b6:66:7c:33:7b:62:92:61:fd:ca:2a.
    Are you sure you want to continue connecting (yes/no)? yes
    Warning: Permanently added 'localhost' (RSA) to the list of known hosts.
    
  4. Confirm that the password-less SSH is working.
    $ ssh localhost
    $ ssh localhost
    

What just happened?

Because Hadoop requires communication between multiple processes on one or more machines, we need to ensure that the user we are using for Hadoop can connect to each required host without needing a password. We do this by creating a Secure Shell (SSH) key pair that has an empty passphrase. We use the ssh-keygen command to start this process and accept the offered defaults.

Once we create the key pair, we need to add the new public key to the stored list of trusted keys; this means that when trying to connect to this machine, the public key will be trusted. After doing so, we use the ssh command to connect to the local machine and should expect to get a warning about trusting the host certificate as just shown. After confirming this, we should then be able to connect without further passwords or prompts.

Note

Note that when we move later to use a fully distributed cluster, we will need to ensure that the Hadoop user account has the same key set up on every host in the cluster.

Configuring and running Hadoop

So far this has all been pretty straightforward, just downloading and system administration. Now we can deal with Hadoop directly. Finally! We'll run a quick example to show Hadoop in action. There is additional configuration and set up to be performed, but this next step will help give confidence that things are installed and configured correctly so far.

Time for action – using Hadoop to calculate Pi

We will now use a sample Hadoop program to calculate the value of Pi. Right now, this is primarily to validate the installation and to show how quickly you can get a MapReduce job to execute. Assuming the HADOOP_HOME/bin directory is in your path, type the following commands:

$ Hadoop jar hadoop/hadoop-examples-1.0.4.jar pi 4 1000
Number of Maps = 4
Samples per Map = 1000
Wrote input for Map #0
Wrote input for Map #1
Wrote input for Map #2
Wrote input for Map #3
Starting Job
12/10/26 22:56:11 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
12/10/26 22:56:11 INFO mapred.FileInputFormat: Total input paths to process : 4
12/10/26 22:56:12 INFO mapred.JobClient: Running job: job_local_0001
12/10/26 22:56:12 INFO mapred.FileInputFormat: Total input paths to process : 4
12/10/26 22:56:12 INFO mapred.MapTask: numReduceTasks: 1

12/10/26 22:56:14 INFO mapred.JobClient: map 100% reduce 100%
12/10/26 22:56:14 INFO mapred.JobClient: Job complete: job_local_0001
12/10/26 22:56:14 INFO mapred.JobClient: Counters: 13
12/10/26 22:56:14 INFO mapred.JobClient: FileSystemCounters

Job Finished in 2.904 seconds
Estimated value of Pi is 3.14000000000000000000
$

What just happened?

There's a lot of information here; even more so when you get the full output on your screen. For now, let's unpack the fundamentals and not worry about much of Hadoop's status output until later in the book. The first thing to clarify is some terminology; each Hadoop program runs as a job that creates multiple tasks to do its work.

Looking at the output, we see it is broadly split into three sections:

  • The start up of the job
  • The status as the job executes
  • The output of the job

In our case, we can see the job creates four tasks to calculate Pi, and the overall job result will be the combination of these subresults. This pattern should sound familiar to the one we came across in Chapter 1, What It's All About; the model is used to split a larger job into smaller pieces and then bring together the results.

The majority of the output will appear as the job is being executed and provide status messages showing progress. On successful completion, the job will print out a number of counters and other statistics. The preceding example is actually unusual in that it is rare to see the result of a MapReduce job displayed on the console. This is not a limitation of Hadoop, but rather a consequence of the fact that jobs that process large data sets usually produce a significant amount of output data that isn't well suited to a simple echoing on the screen.

Congratulations on your first successful MapReduce job!

Three modes

In our desire to get something running on Hadoop, we sidestepped an important issue: in which mode should we run Hadoop? There are three possibilities that alter where the various Hadoop components execute. Recall that HDFS comprises a single NameNode that acts as the cluster coordinator and is the master for one or more DataNodes that store the data. For MapReduce, the JobTracker is the cluster master and it coordinates the work executed by one or more TaskTracker processes. The Hadoop modes deploy these components as follows:

  • Local standalone mode: This is the default mode if, as in the preceding Pi example, you don't configure anything else. In this mode, all the components of Hadoop, such as NameNode, DataNode, JobTracker, and TaskTracker, run in a single Java process.
  • Pseudo-distributed mode: In this mode, a separate JVM is spawned for each of the Hadoop components and they communicate across network sockets, effectively giving a fully functioning minicluster on a single host.
  • Fully distributed mode: In this mode, Hadoop is spread across multiple machines, some of which will be general-purpose workers and others will be dedicated hosts for components, such as NameNode and JobTracker.

Each mode has its benefits and drawbacks. Fully distributed mode is obviously the only one that can scale Hadoop across a cluster of machines, but it requires more configuration work, not to mention the cluster of machines. Local, or standalone, mode is the easiest to set up, but you interact with it in a different manner than you would with the fully distributed mode. In this book, we shall generally prefer the pseudo-distributed mode even when using examples on a single host, as everything done in the pseudo-distributed mode is almost identical to how it works on a much larger cluster.

Time for action – configuring the pseudo-distributed mode

Take a look in the conf directory within the Hadoop distribution. There are many configuration files, but the ones we need to modify are core-site.xml, hdfs-site.xml and mapred-site.xml.

  1. Modify core-site.xml to look like the following code:
    <?xml version="1.0"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    
    <!-- Put site-specific property overrides in this file. -->
    
    <configuration>
    <property>
    <name>fs.default.name</name>
    <value>hdfs://localhost:9000</value>
    </property>
    </configuration>
  2. Modify hdfs-site.xml to look like the following code:
    <?xml version="1.0"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    
    <!-- Put site-specific property overrides in this file. -->
    
    <configuration>
    <property>
    <name>dfs.replication</name>
    <value>1</value>
    </property>
    </configuration>
  3. Modify mapred-site.xml to look like the following code:
    <?xml version="1.0"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    
    <!-- Put site-specific property overrides in this file. -->
    
    <configuration>
    <property>
    <name>mapred.job.tracker</name>
    <value>localhost:9001</value>
    </property>
    </configuration>

What just happened?

The first thing to note is the general format of these configuration files. They are obviously XML and contain multiple property specifications within a single configuration element.

The property specifications always contain name and value elements with the possibility for optional comments not shown in the preceding code.

We set three configuration variables here:

  • The dfs.default.name variable holds the location of the NameNode and is required by both HDFS and MapReduce components, which explains why it's in core-site.xml and not hdfs-site.xml.
  • The dfs.replication variable specifies how many times each HDFS block should be replicated. Recall from Chapter 1, What It's All About, that HDFS handles failures by ensuring each block of filesystem data is replicated to a number of different hosts, usually 3. As we only have a single host and one DataNode in the pseudo-distributed mode, we change this value to 1.
  • The mapred.job.tracker variable holds the location of the JobTracker just like dfs.default.name holds the location of the NameNode. Because only MapReduce components need know this location, it is in mapred-site.xml.

Note

You are free, of course, to change the port numbers used, though 9000 and 9001 are common conventions in Hadoop.

The network addresses for the NameNode and the JobTracker specify the ports on which the actual system requests should be directed. These are not user-facing locations, so don't bother pointing your web browser at them. There are web interfaces that we will look at shortly.

Configuring the base directory and formatting the filesystem

If the pseudo-distributed or fully distributed mode is chosen, there are two steps that need to be performed before we start our first Hadoop cluster.

  1. Set the base directory where Hadoop files will be stored.
  2. Format the HDFS filesystem.

Note

To be precise, we don't need to change the default directory; but, as seen later, it's a good thing to think about it now.

Time for action – changing the base HDFS directory

Let's first set the base directory that specifies the location on the local filesystem under which Hadoop will keep all its data. Carry out the following steps:

  1. Create a directory into which Hadoop will store its data:
    $ mkdir /var/lib/hadoop
    
  2. Ensure the directory is writeable by any user:
    $ chmod 777 /var/lib/hadoop
    
  3. Modify core-site.xml once again to add the following property:
    <property>
    <name>hadoop.tmp.dir</name>
    <value>/var/lib/hadoop</value>
    </property>

What just happened?

As we will be storing data in Hadoop and all the various components are running on our local host, this data will need to be stored on our local filesystem somewhere. Regardless of the mode, Hadoop by default uses the hadoop.tmp.dir property as the base directory under which all files and data are written.

MapReduce, for example, uses a /mapred directory under this base directory; HDFS uses /dfs. The danger is that the default value of hadoop.tmp.dir is /tmp and some Linux distributions delete the contents of /tmp on each reboot. So it's safer to explicitly state where the data is to be held.

Time for action – formatting the NameNode

Before starting Hadoop in either pseudo-distributed or fully distributed mode for the first time, we need to format the HDFS filesystem that it will use. Type the following:

$ hadoop namenode -format

The output of this should look like the following:

$ hadoop namenode -format
12/10/26 22:45:25 INFO namenode.NameNode: STARTUP_MSG: 
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG: host = vm193/10.0.0.193
STARTUP_MSG: args = [-format]

12/10/26 22:45:25 INFO namenode.FSNamesystem: fsOwner=hadoop,hadoop
12/10/26 22:45:25 INFO namenode.FSNamesystem: supergroup=supergroup
12/10/26 22:45:25 INFO namenode.FSNamesystem: isPermissionEnabled=true
12/10/26 22:45:25 INFO common.Storage: Image file of size 96 saved in 0 seconds.
12/10/26 22:45:25 INFO common.Storage: Storage directory /var/lib/hadoop-hadoop/dfs/name has been successfully formatted.
12/10/26 22:45:26 INFO namenode.NameNode: SHUTDOWN_MSG: 
/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at vm193/10.0.0.193
$ 

What just happened?

This is not a very exciting output because the step is only an enabler for our future use of HDFS. However, it does help us think of HDFS as a filesystem; just like any new storage device on any operating system, we need to format the device before we can use it. The same is true for HDFS; initially there is a default location for the filesystem data but no actual data for the equivalents of filesystem indexes.

Note

Do this every time!

If your experience with Hadoop has been similar to the one I have had, there will be a series of simple mistakes that are frequently made when setting up new installations. It is very easy to forget about the formatting of the NameNode and then get a cascade of failure messages when the first Hadoop activity is tried.

But do it only once!

The command to format the NameNode can be executed multiple times, but in doing so all existing filesystem data will be destroyed. It can only be executed when the Hadoop cluster is shut down and sometimes you will want to do it but in most other cases it is a quick way to irrevocably delete every piece of data on HDFS; it does take much longer on large clusters. So be careful!

Starting and using Hadoop

After all that configuration and setup, let's now start our cluster and actually do something with it.

Time for action – starting Hadoop

Unlike the local mode of Hadoop, where all the components run only for the lifetime of the submitted job, with the pseudo-distributed or fully distributed mode of Hadoop, the cluster components exist as long-running processes. Before we use HDFS or MapReduce, we need to start up the needed components. Type the following commands; the output should look as shown next, where the commands are included on the lines prefixed by $:

  1. Type in the first command:
    $ start-dfs.sh
    starting namenode, logging to /home/hadoop/hadoop/bin/../logs/hadoop-hadoop-namenode-vm193.out
    localhost: starting datanode, logging to /home/hadoop/hadoop/bin/../logs/hadoop-hadoop-datanode-vm193.out
    localhost: starting secondarynamenode, logging to /home/hadoop/hadoop/bin/../logs/hadoop-hadoop-secondarynamenode-vm193.out
    
  2. Type in the second command:
    $ jps
    9550 DataNode
    9687 Jps
    9638 SecondaryNameNode
    9471 NameNode
    
  3. Type in the third command:
    $ hadoop dfs -ls /
    Found 2 items
    drwxr-xr-x - hadoop supergroup 0 2012-10-26 23:03 /tmp
    drwxr-xr-x - hadoop supergroup 0 2012-10-26 23:06 /user
    
  4. Type in the fourth command:
    $ start-mapred.sh 
    starting jobtracker, logging to /home/hadoop/hadoop/bin/../logs/hadoop-hadoop-jobtracker-vm193.out
    localhost: starting tasktracker, logging to /home/hadoop/hadoop/bin/../logs/hadoop-hadoop-tasktracker-vm193.out
    
  5. Type in the fifth command:
    $ jps
    9550 DataNode
    9877 TaskTracker
    9638 SecondaryNameNode
    9471 NameNode
    9798 JobTracker
    9913 Jps
    

What just happened?

The start-dfs.sh command, as the name suggests, starts the components necessary for HDFS. This is the NameNode to manage the filesystem and a single DataNode to hold data. The SecondaryNameNode is an availability aid that we'll discuss in a later chapter.

After starting these components, we use the JDK's jps utility to see which Java processes are running, and, as the output looks good, we then use Hadoop's dfs utility to list the root of the HDFS filesystem.

After this, we use start-mapred.sh to start the MapReduce components—this time the JobTracker and a single TaskTracker—and then use jps again to verify the result.

There is also a combined start-all.sh file that we'll use at a later stage, but in the early days it's useful to do a two-stage start up to more easily verify the cluster configuration.

Time for action – using HDFS

As the preceding example shows, there is a familiar-looking interface to HDFS that allows us to use commands similar to those in Unix to manipulate files and directories on the filesystem. Let's try it out by typing the following commands:

Type in the following commands:

$ hadoop -mkdir /user
$ hadoop -mkdir /user/hadoop
$ hadoop fs -ls /user
Found 1 items
drwxr-xr-x - hadoop supergroup 0 2012-10-26 23:09 /user/Hadoop
$ echo "This is a test." >> test.txt
$ cat test.txt
This is a test.
$ hadoop dfs -copyFromLocal test.txt .
$ hadoop dfs -ls
Found 1 items
-rw-r--r-- 1 hadoop supergroup 16 2012-10-26 23:19/user/hadoop/test.txt
$ hadoop dfs -cat test.txt
This is a test.
$ rm test.txt 
$ hadoop dfs -cat test.txt
This is a test.
$ hadoop fs -copyToLocal test.txt
$ cat test.txt
This is a test.

What just happened?

This example shows the use of the fs subcommand to the Hadoop utility. Note that both dfs and fs commands are equivalent). Like most filesystems, Hadoop has the concept of a home directory for each user. These home directories are stored under the /user directory on HDFS and, before we go further, we create our home directory if it does not already exist.

We then create a simple text file on the local filesystem and copy it to HDFS by using the copyFromLocal command and then check its existence and contents by using the -ls and -cat utilities. As can be seen, the user home directory is aliased to . because, in Unix, -ls commands with no path specified are assumed to refer to that location and relative paths (not starting with /) will start there.

We then deleted the file from the local filesystem, copied it back from HDFS by using the -copyToLocal command, and checked its contents using the local cat utility.

Note

Mixing HDFS and local filesystem commands, as in the preceding example, is a powerful combination, and it's very easy to execute on HDFS commands that were intended for the local filesystem and vice versa. So be careful, especially when deleting.

There are other HDFS manipulation commands; try Hadoop fs -help for a detailed list.

Time for action – WordCount, the Hello World of MapReduce

Many applications, over time, acquire a canonical example that no beginner's guide should be without. For Hadoop, this is WordCount – an example bundled with Hadoop that counts the frequency of words in an input text file.

  1. First execute the following commands:
    $ hadoop dfs -mkdir data
    $ hadoop dfs -cp test.txt data
    $ hadoop dfs -ls data
    Found 1 items
    -rw-r--r-- 1 hadoop supergroup 16 2012-10-26 23:20 /user/hadoop/data/test.txt
    
  2. Now execute these commands:
    $ Hadoop Hadoop/hadoop-examples-1.0.4.jar wordcount data out
    12/10/26 23:22:49 INFO input.FileInputFormat: Total input paths to process : 1
    12/10/26 23:22:50 INFO mapred.JobClient: Running job: job_201210262315_0002
    12/10/26 23:22:51 INFO mapred.JobClient: map 0% reduce 0%
    12/10/26 23:23:03 INFO mapred.JobClient: map 100% reduce 0%
    12/10/26 23:23:15 INFO mapred.JobClient: map 100% reduce 100%
    12/10/26 23:23:17 INFO mapred.JobClient: Job complete: job_201210262315_0002
    12/10/26 23:23:17 INFO mapred.JobClient: Counters: 17
    12/10/26 23:23:17 INFO mapred.JobClient: Job Counters 
    12/10/26 23:23:17 INFO mapred.JobClient: Launched reduce tasks=1
    12/10/26 23:23:17 INFO mapred.JobClient: Launched map tasks=1
    12/10/26 23:23:17 INFO mapred.JobClient: Data-local map tasks=1
    12/10/26 23:23:17 INFO mapred.JobClient: FileSystemCounters
    12/10/26 23:23:17 INFO mapred.JobClient: FILE_BYTES_READ=46
    12/10/26 23:23:17 INFO mapred.JobClient: HDFS_BYTES_READ=16
    12/10/26 23:23:17 INFO mapred.JobClient: FILE_BYTES_WRITTEN=124
    12/10/26 23:23:17 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=24
    12/10/26 23:23:17 INFO mapred.JobClient: Map-Reduce Framework
    12/10/26 23:23:17 INFO mapred.JobClient: Reduce input groups=4
    12/10/26 23:23:17 INFO mapred.JobClient: Combine output records=4
    12/10/26 23:23:17 INFO mapred.JobClient: Map input records=1
    12/10/26 23:23:17 INFO mapred.JobClient: Reduce shuffle bytes=46
    12/10/26 23:23:17 INFO mapred.JobClient: Reduce output records=4
    12/10/26 23:23:17 INFO mapred.JobClient: Spilled Records=8
    12/10/26 23:23:17 INFO mapred.JobClient: Map output bytes=32
    12/10/26 23:23:17 INFO mapred.JobClient: Combine input records=4
    12/10/26 23:23:17 INFO mapred.JobClient: Map output records=4
    12/10/26 23:23:17 INFO mapred.JobClient: Reduce input records=4
    
  3. Execute the following command:
    $ hadoop fs -ls out
    Found 2 items
    drwxr-xr-x - hadoop supergroup 0 2012-10-26 23:22 /user/hadoop/out/_logs
    -rw-r--r-- 1 hadoop supergroup 24 2012-10-26 23:23 /user/hadoop/out/part-r-00000
    
  4. Now execute this command:
    $ hadoop fs -cat out/part-0-00000
    This 1
    a 1
    is 1
    test. 1
    

What just happened?

We did three things here, as follows:

  • Moved the previously created text file into a new directory on HDFS
  • Ran the example WordCount job specifying this new directory and a non-existent output directory as arguments
  • Used the fs utility to examine the output of the MapReduce job

As we said earlier, the pseudo-distributed mode has more Java processes, so it may seem curious that the job output is significantly shorter than for the standalone Pi. The reason is that the local standalone mode prints information about each individual task execution to the screen, whereas in the other modes this information is written only to logfiles on the running hosts.

The output directory is created by Hadoop itself and the actual result files follow the part-nnnnn convention illustrated here; though given our setup, there is only one result file. We use the fs -cat command to examine the file, and the results are as expected.

Note

If you specify an existing directory as the output source for a Hadoop job, it will fail to run and will throw an exception complaining of an already existing directory. If you want Hadoop to store the output to a directory, it must not exist. Treat this as a safety mechanism that stops Hadoop from writing over previous valuable job runs and something you will forget to ascertain frequently. If you are confident, you can override this behavior, as we will see later.

The Pi and WordCount programs are only some of the examples that ship with Hadoop. Here is how to get a list of them all. See if you can figure some of them out.

$ hadoop jar hadoop/hadoop-examples-1.0.4.jar 

Have a go hero – WordCount on a larger body of text

Running a complex framework like Hadoop utilizing five discrete Java processes to count the words in a single-line text file is not terribly impressive. The power comes from the fact that we can use exactly the same program to run WordCount on a larger file, or even a massive corpus of text spread across a multinode Hadoop cluster. If we had such a setup, we would execute exactly the same commands as we just did by running the program and simply specifying the location of the directories for the source and output data.

Find a large online text file—Project Gutenberg at http://www.gutenberg.org is a good starting point—and run WordCount on it by copying it onto the HDFS and executing the WordCount example. The output may not be as you expect because, in a large body of text, issues of dirty data, punctuation, and formatting will need to be addressed. Think about how WordCount could be improved; we'll study how to expand it into a more complex processing chain in the next chapter.

Monitoring Hadoop from the browser

So far, we have been relying on command-line tools and direct command output to see what our system is doing. Hadoop provides two web interfaces that you should become familiar with, one for HDFS and the other for MapReduce. Both are useful in pseudo-distributed mode and are critical tools when you have a fully distributed setup.

The HDFS web UI

Point your web browser to port 50030 on the host running Hadoop. By default, the web interface should be available from both the local host and any other machine that has network access. Here is an example screenshot:

There is a lot going on here, but the immediately critical data tells us the number of nodes in the cluster, the filesystem size, used space, and links to drill down for more info and even browse the filesystem.

Spend a little time playing with this interface; it needs to become familiar. With a multinode cluster, the information about live and dead nodes plus the detailed information on their status history will be critical to debugging cluster problems.

The MapReduce web UI

The JobTracker UI is available on port 50070 by default, and the same access rules stated earlier apply. Here is an example screenshot:

This is more complex than the HDFS interface! Along with a similar count of the number of live/dead nodes, there is a history of the number of jobs executed since startup and a breakdown of their individual task counts.

The list of executing and historical jobs is a doorway to much more information; for every job, we can access the history of every task attempt on every node and access logs for detailed information. We now expose one of the most painful parts of working with any distributed system: debugging. It can be really hard.

Imagine you have a cluster of 100 machines trying to process a massive data set where the full job requires each host to execute hundreds of map and reduce tasks. If the job starts running very slowly or explicitly fails, it is not always obvious where the problem lies. Looking at the MapReduce web UI will likely be the first port of call because it provides such a rich starting point to investigate the health of running and historical jobs.

Using Elastic MapReduce

We will now turn to Hadoop in the cloud, the Elastic MapReduce service offered by Amazon Web Services. There are multiple ways to access EMR, but for now we will focus on the provided web console to contrast a full point-and-click approach to Hadoop with the previous command-line-driven examples.

Setting up an account in Amazon Web Services

Before using Elastic MapReduce, we need to set up an Amazon Web Services account and register it with the necessary services.

Creating an AWS account

Amazon has integrated their general accounts with AWS, meaning that if you already have an account for any of the Amazon retail websites, this is the only account you will need to use AWS services.

Note that AWS services have a cost; you will need an active credit card associated with the account to which charges can be made.

If you require a new Amazon account, go to http://aws.amazon.com, select create a new AWS account, and follow the prompts. Amazon has added a free tier for some services, so you may find that in the early days of testing and exploration you are keeping many of your activities within the non-charged tier. The scope of the free tier has been expanding, so make sure you know for what you will and won't be charged.

Signing up for the necessary services

Once you have an Amazon account, you will need to register it for use with the required AWS services, that is, Simple Storage Service (S3), Elastic Compute Cloud (EC2), and Elastic MapReduce (EMR). There is no cost for simply signing up to any AWS service; the process just makes the service available to your account.

Go to the S3, EC2, and EMR pages linked from http://aws.amazon.com and click on the Sign up button on each page; then follow the prompts.

Note

Caution! This costs real money!

Before going any further, it is critical to understand that use of AWS services will incur charges that will appear on the credit card associated with your Amazon account. Most of the charges are quite small and increase with the amount of infrastructure consumed; storing 10 GB of data in S3 costs 10 times more than for 1 GB, and running 20 EC2 instances costs 20 times as much as a single one. There are tiered cost models, so the actual costs tend to have smaller marginal increases at higher levels. But you should read carefully through the pricing sections for each service before using any of them. Note also that currently data transfer out of AWS services, such as EC2 and S3, is chargeable but data transfer between services is not. This means it is often most cost-effective to carefully design your use of AWS to keep data within AWS through as much of the data processing as possible.

Time for action – WordCount on EMR using the management console

Let's jump straight into an example on EMR using some provided example code. Carry out the following steps:

  1. Browse to on the Sign in to the AWS Console button. The default view should look like the following screenshot. If it does not, click on Amazon S3 from within the console.
  2. As shown in the preceding screenshot, click on the Create bucket button and enter a name for the new bucket. Bucket names must be globally unique across all AWS users, so do not expect obvious bucket names such as mybucket or s3test to be available.
  3. Click on the Region drop-down menu and select the geographic area nearest to you.
  4. Click on the Elastic MapReduce link and click on the Create a new Job Flow button. You should see a screen like the following screenshot:
  5. You should now see a screen like the preceding screenshot. Select the Run a sample application radio button and the Word Count (Streaming) menu item from the sample application drop-down box and click on the Continue button.
  6. The next screen, shown in the preceding screenshot, allows us to specify the location of the output produced by running the job. In the edit box for the output location, enter the name of the bucket created in step 1 (garryt1use is the bucket we are using here); then click on the Continue button.
  7. The next screenshot shows the page where we can modify the number and size of the virtual hosts utilized by our job. Confirm that the instance type for each combo box is Small (m1.small), and the number of nodes for the Core group is 2 and for the Task group it is 0. Then click on the Continue button.
  8. This next screenshot involves options we will not be using in this example. For the Amazon EC2 key pair field, select the Proceed without key pair menu item and click on the No radio button for the Enable Debugging field. Ensure that the Keep Alive radio button is set to No and click on the Continue button.
  9. The next screen, shown in the preceding screenshot, is one we will not be doing much with right now. Confirm that the Proceed with no Bootstrap Actions radio button is selected and click on the Continue button.
  10. Confirm the job flow specifications are as expected and click on the Create Job Flow button. Then click on the View my Job Flows and check status buttons. This will give a list of your job flows; you can filter to show only running or completed jobs. The default is to show all, as in the example shown in the following screenshot:
  11. Occasionally hit the Refresh button until the status of the listed job, Running or Starting, changes to Complete; then click its checkbox to see details of the job flow, as shown in the following screenshot:
  12. Click the S3 tab and select the bucket you created for the output location. You will see it has a single entry called wordcount, which is a directory. Right-click on that and select Open. Then do the same until you see a list of actual files following the familiar Hadoop part-nnnnn naming scheme, as shown in the following screenshot:

    Right click on part-00000 and open it. It should look something like this:

    a              14716
    aa             52
    aakar          3
    aargau         3
    abad           3
    abandoned      46
    abandonment    6
    abate          9
    abauj          3
    abbassid       4
    abbes          3
    abbl           3
    …

    Does this type of output look familiar?

What just happened?

The first step deals with S3, and not EMR. S3 is a scalable storage service that allows you to store files (called objects) within containers called buckets, and to access objects by their bucket and object key (that is, name). The model is analogous to the usage of a filesystem, and though there are underlying differences, they are unlikely to be important within this book.

S3 is where you will place the MapReduce programs and source data you want to process in EMR, and where the output and logs of EMR Hadoop jobs will be stored. There is a plethora of third-party tools to access S3, but here we are using the AWS management console, a browser interface to most AWS services.

Though we suggested you choose the nearest geographic region for S3, this is not required; non-US locations will typically give better latency for customers located nearer to them, but they also tend to have a slightly higher cost. The decision of where to host your data and applications is one you need to make after considering all these factors.

After creating the S3 bucket, we moved to the EMR console and created a new job flow. This term is used within EMR to refer to a data processing task. As we will see, this can be a one-time deal where the underlying Hadoop cluster is created and destroyed on demand or it can be a long-running cluster on which multiple jobs are executed.

We left the default job flow name and then selected the use of an example application, in this case, the Python implementation of WordCount. The term Hadoop Streaming refers to a mechanism allowing scripting languages to be used to write map and reduce tasks, but the functionality is the same as the Java WordCount we used earlier.

The form to specify the job flow requires a location for the source data, program, map and reduce classes, and a desired location for the output data. For the example we just saw, most of the fields were prepopulated; and, as can be seen, there are clear similarities to what was required when running local Hadoop from the command line.

By not selecting the Keep Alive option, we chose a Hadoop cluster that would be created specifically to execute this job, and destroyed afterwards. Such a cluster will have a longer startup time but will minimize costs. If you choose to keep the job flow alive, you will see additional jobs executed more quickly as you don't have to wait for the cluster to start up. But you will be charged for the underlying EC2 resources until you explicitly terminate the job flow.

After confirming, we do not need to add any additional bootstrap options; we selected the number and types of hosts we wanted to deploy into our Hadoop cluster. EMR distinguishes between three different groups of hosts:

  • Master group: This is a controlling node hosting the NameNode and the JobTracker. There is only 1 of these.
  • Core group: These are nodes running both HDFS DataNodes and MapReduce TaskTrackers. The number of hosts is configurable.
  • Task group: These hosts don't hold HDFS data but do run TaskTrackers and can provide more processing horsepower. The number of hosts is configurable.

The type of host refers to different classes of hardware capability, the details of which can be found on the EC2 page. Larger hosts are more powerful but have a higher cost. Currently, by default, the total number of hosts in a job flow must be 20 or less, though Amazon has a simple form to request higher limits.

After confirming, all is as expected—we launch the job flow and monitor it on the console until the status changes to COMPLETED. At this point, we go back to S3, look inside the bucket we specified as the output destination, and examine the output of our WordCount job, which should look very similar to the output of a local Hadoop WordCount.

An obvious question is where did the source data come from? This was one of the prepopulated fields in the job flow specification we saw during the creation process. For nonpersistent job flows, the most common model is for the source data to be read from a specified S3 source location and the resulting data written to the specified result S3 bucket.

That is it! The AWS management console allows fine-grained control of services such as S3 and EMR from the browser. Armed with nothing more than a browser and a credit card, we can launch Hadoop jobs to crunch data without ever having to worry about any of the mechanics around installing, running, or managing Hadoop.

Have a go hero – other EMR sample applications

EMR provides several other sample applications. Why not try some of them as well?

Other ways of using EMR

Although a powerful and impressive tool, the AWS management console is not always how we want to access S3 and run EMR jobs. As with all AWS services, there are both programmatic and command-line tools to use the services.

AWS credentials

Before using either programmatic or command-line tools, however, we need to look at how an account holder authenticates for AWS to make such requests. As these are chargeable services, we really do not want anyone else to make requests on our behalf. Note that as we logged directly into the AWS management console with our AWS account in the preceding example, we did not have to worry about this.

Each AWS account has several identifiers that are used when accessing the various services:

  • Account ID: Each AWS account has a numeric ID.
  • Access key: Each account has an associated access key that is used to identify the account making the request.
  • Secret access key: The partner to the access key is the secret access key. The access key is not a secret and could be exposed in service requests, but the secret access key is what you use to validate yourself as the account owner.
  • Key pairs: These are the key pairs used to log in to EC2 hosts. It is possible to either generate public/private key pairs within EC2 or to import externally generated keys into the system.

If this sounds confusing, it's because it is. At least at first. When using a tool to access an AWS service, however, there's usually a single up-front step of adding the right credentials to a configured file, and then everything just works. However, if you do decide to explore programmatic or command-line tools, it will be worth a little time investment to read the documentation for each service to understand how its security works.

The EMR command-line tools

In this book, we will not do anything with S3 and EMR that cannot be done from the AWS management console. However, when working with operational workloads, looking to integrate into other workflows, or automating service access, a browser-based tool is not appropriate, regardless of how powerful it is. Using the direct programmatic interfaces to a service provides the most granular control but requires the most effort.

Amazon provides for many services a group of command-line tools that provide a useful way of automating access to AWS services that minimizes the amount of required development. The Elastic MapReduce command-line tools, linked from the main EMR page, are worth a look if you want a more CLI-based interface to EMR but don't want to write custom code just yet.

The AWS ecosystem

Each AWS service also has a plethora of third-party tools, services, and libraries that can provide different ways of accessing the service, provide additional functionality, or offer new utility programs. Check out the developer tools hub at http://aws.amazon.com/developertools, as a starting point.

Comparison of local versus EMR Hadoop

After our first experience of both a local Hadoop cluster and its equivalent in EMR, this is a good point at which we can consider the differences of the two approaches.

As may be apparent, the key differences are not really about capability; if all we want is an environment to run MapReduce jobs, either approach is completely suited. Instead, the distinguishing characteristics revolve around a topic we touched on in Chapter 1, What It's All About, that being whether you prefer a cost model that involves upfront infrastructure costs and ongoing maintenance effort over one with a pay-as-you-go model with a lower maintenance burden along with rapid and conceptually infinite scalability. Other than the cost decisions, there are a few things to keep in mind:

  • EMR supports specific versions of Hadoop and has a policy of upgrading over time. If you have a need for a specific version, in particular if you need the latest and greatest versions immediately after release, then the lag before these are live on EMR may be unacceptable.
  • You can start up a persistent EMR job flow and treat it much as you would a local Hadoop cluster, logging into the hosting nodes and tweaking their configuration. If you find yourself doing this, its worth asking if that level of control is really needed and, if so, is it stopping you getting all the cost model benefits of a move to EMR?
  • If it does come down to a cost consideration, remember to factor in all the hidden costs of a local cluster that are often forgotten. Think about the costs of power, space, cooling, and facilities. Not to mention the administration overhead, which can be nontrivial if things start breaking in the early hours of the morning.

Summary

We covered a lot of ground in this chapter, in regards to getting a Hadoop cluster up and running and executing MapReduce programs on it.

Specifically, we covered the prerequisites for running Hadoop on local Ubuntu hosts. We also saw how to install and configure a local Hadoop cluster in either standalone or pseudo-distributed modes. Then, we looked at how to access the HDFS filesystem and submit MapReduce jobs. We then moved on and learned what accounts are needed to access Elastic MapReduce and other AWS services.

We saw how to browse and create S3 buckets and objects using the AWS management console, and also how to create a job flow and use it to execute a MapReduce job on an EMR-hosted Hadoop cluster. We also discussed other ways of accessing AWS services and studied the differences between local and EMR-hosted Hadoop.

Now that we have learned about running Hadoop locally or on EMR, we are ready to start writing our own MapReduce programs, which is the topic of the next chapter.