Hadoop:Data Processing and Modelling
上QQ阅读APP看书,第一时间看更新

Chapter 7. Keeping Things Running

Having a Hadoop cluster is not all about writing interesting programs to do clever data analysis. You also need to maintain the cluster, and keep it tuned and ready to do the data crunching you want.

In this chapter we will cover:

  • More about Hadoop configuration properties
  • How to select hardware for your cluster
  • How Hadoop security works
  • Managing the NameNode
  • Managing HDFS
  • Managing MapReduce
  • Scaling the cluster

Although these topics are operationally focused, they do give us an opportunity to explore some aspects of Hadoop we have not looked at before. Therefore, even if you won't be personally managing the cluster, there should be useful information here for you too.

A note on EMR

One of the main benefits of using cloud services such as those offered by Amazon Web Services is that much of the maintenance overhead is borne by the service provider. Elastic MapReduce can create Hadoop clusters tied to the execution of a single task (non-persistent job flows) or allow long-running clusters that can be used for multiple jobs (persistent job flows). When non-persistent job flows are used, the actual mechanics of how the underlying Hadoop cluster is configured and run are largely invisible to the user. Consequently, users employing non-persistent job flows will not need to consider many of the topics in this chapter. If you are using EMR with persistent job flows, many topics (but not all) do become relevant.

We will generally talk about local Hadoop clusters in this chapter. If you need to reconfigure a persistent job flow, use the same Hadoop properties but set them as described in Chapter 3, Writing MapReduce Jobs.

Hadoop configuration properties

Before we look at running the cluster, let's talk a little about Hadoop's configuration properties. We have been introducing many of these along the way, and there are a few additional points worth considering.

Default values

One of the most mystifying things to a new Hadoop user is the large number of configuration properties. Where do they come from, what do they mean, and what are their default values?

If you have the full Hadoop distribution—that is, not just the binary distribution—the following XML files will answer your questions:

  • Hadoop/src/core/core-default.xml
  • Hadoop/src/hdfs/hdfs-default.xml
  • Hadoop/src/mapred/mapred-default.xml

Time for action – browsing default properties

Fortunately, the XML documents are not the only way of looking at the default values; there are also more readable HTML versions, which we'll now take a quick look at.

These files are not included in the Hadoop binary-only distribution; if you are using that, you can also find these files on the Hadoop website.

  1. Point your browser at the docs/core-default.html file within your Hadoop distribution directory and browse its contents. It should look like the next screenshot:
  2. Now, similarly, browse these other files:
    • Hadoop/docs/hdfs-default.html
    • Hadoop/docs/mapred-default.html

What just happened?

As you can see, each property has a name, default value, and a brief description. You will also see there are indeed a very large number of properties. Do not expect to understand all of these now, but do spend a little time browsing to get a flavor for the type of customization allowed by Hadoop.

Additional property elements

When we have previously set properties in the configuration files, we have used an XML element of the following form:

<property>
<name>the.property.name</name>
<value>The property value</value>
</property>

There are an additional two optional XML elements we can add, description and final. A fully described property using these additional elements now looks as follows:

<property>
<name>the.property.name</name>
<value>The default property value</value>
<description>A textual description of the property</description>
<final>Boolean</final>
</property>

The description element is self-explanatory and provides the location for the descriptive text we saw for each property in the preceding HTML files.

The final property has a similar meaning as in Java: any property marked final cannot be overridden by values in any other files or by other means; we will see this shortly. Use this for those properties where for performance, integrity, security, or other reasons, you wish to enforce cluster-wide values.

Default storage location

You will see properties that modify where Hadoop stores its data on both the local disk and HDFS. There's one property used as the basis for many others hadoop.tmp.dir, which is the root location for all Hadoop files, and its default value is /tmp.

Unfortunately, many Linux distributions—including Ubuntu—are configured to remove the contents of this directory on each reboot. This means that if you do not override this property, you will lose all your HDFS data on the next host reboot. Therefore, it is worthwhile to set something like the following in core-site.xml:

<property>
<name>hadoop.tmp.dir</name>
<value>/var/lib/hadoop</value>
</property>

Remember to ensure the location is writable by the user who will start Hadoop, and that the disk the directory is located on has enough space. As you will see later, there are a number of other properties that allow more granular control of where particular types of data are stored.

Where to set properties

We have previously used the configuration files to specify new values for Hadoop properties. This is fine, but does have an overhead if we are trying to find the best value for a property or are executing a job that requires special handling.

It is possible to use the JobConf class to programmatically set configuration properties on the executing job. There are two types of methods supported, the first being those that are dedicated to setting a specific property, such as the ones we've seen for setting the job name, input, and output formats, among others. There are also methods to set properties such as the preferred number of map and reduce tasks for the job.

In addition, there are a set of generic methods, such as the following:

  • Void set(String key, String value);
  • Void setIfUnset(String key, String value);
  • Void setBoolean( String key, Boolean value);
  • Void setInt(String key, int value);

These are more flexible and do not require specific methods to be created for each property we wish to modify. However, they also lose compile time checking meaning you can use an invalid property name or assign the wrong type to a property and will only find out at runtime.

Note

This ability to set property values both programmatically and in the configuration files is an important reason for the ability to mark a property as final. For properties for which you do not want any submitted job to have the ability to override them, set them as final within the master configuration files.

Setting up a cluster

Before we look at how to keep a cluster running, let's explore some aspects of setting it up in the first place.

How many hosts?

When considering a new Hadoop cluster, one of the first questions is how much capacity to start with. We know that we can add additional nodes as our needs grow, but we also want to start off in a way that eases that growth.

There really is no clear-cut answer here, as it will depend largely on the size of the data sets you will be processing and the complexity of the jobs to be executed. The only near-absolute is to say that if you want a replication factor of n, you should have at least that many nodes. Remember though that nodes will fail, and if you have the same number of nodes as the default replication factor then any single failure will push blocks into an under-replicated state. In most clusters with tens or hundreds of nodes, this is not a concern; but for very small clusters with a replication factor of 3, the safest approach would be a five-node cluster.

Calculating usable space on a node

An obvious starting point for the required number of nodes is to look at the size of the data set to be processed on the cluster. If you have hosts with 2 TB of disk space and a 10 TB data set, the temptation would be to assume that five nodes is the minimum number needed.

This is incorrect, as it omits consideration of the replication factor and the need for temporary space. Recall that the output of mappers is written to the local disk to be retrieved by the reducers. We need to account for this non-trivial disk usage.

A good rule of thumb would be to assume a replication factor of 3, and that 25 percent of what remains should be accounted for as temporary space. Using these assumptions, the calculation of the needed cluster for our 10 TB data set on 2 TB nodes would be as follows:

  • Divide the total storage space on a node by the replication factor:

    2 TB/3 = 666 GB

  • Reduce this figure by 25 percent to account for temp space:

    666 GB * 0.75 = 500 GB

  • Each 2 TB node therefore has approximately 500 GB (0.5 TB) of usable space
  • Divide the data set size by this figure:

    10 TB / 500 GB = 20

So our 10 TB data set will likely need a 20 node cluster as a minimum, four times our naïve estimate.

This pattern of needing more nodes than expected is not unusual and should be remembered when considering how high-spec you want the hosts to be; see the Sizing hardware section later in this chapter.

Location of the master nodes

The next question is where the NameNode, JobTracker, and SecondaryNameNode will live. We have seen that a DataNode can run on the same host as the NameNode and the TaskTracker can co-exist with the JobTracker, but this is unlikely to be a great setup for a production cluster.

As we will see, the NameNode and SecondaryNameNode have some specific resource requirements, and anything that affects their performance is likely to slow down the entire cluster operation.

The ideal situation would be to have the NameNode, JobTracker, and SecondaryNameNode on their own dedicated hosts. However, for very small clusters, this would result in a significant increase in the hardware footprint without necessarily reaping the full benefit.

If at all possible, the first step should be to separate the NameNode, JobTracker, and SecondaryNameNode onto a single dedicated host that does not have any DataNode or TaskTracker processes running. As the cluster continues to grow, you can add an additional server host and then move the NameNode onto its own host, keeping the JobTracker and SecondaryNameNode co-located. Finally, as the cluster grows yet further, it will make sense to move to full separation.

Note

As discussed in Chapter 6, Keeping Things Running, Hadoop 2.0 will split the Secondary NameNode into Backup NameNodes and Checkpoint NameNodes. Best practice is still evolving, but aiming towards having a dedicated host each for the NameNode and at least one Backup NameNode looks sensible.

Sizing hardware

The amount of data to be stored is not the only consideration regarding the specification of the hardware to be used for the nodes. Instead, you have to consider the amount of processing power, memory, storage types, and networking available.

Much has been written about selecting hardware for a Hadoop cluster, and once again there is no single answer that will work for all cases. The big variable is the types of MapReduce tasks that will be executed on the data and, in particular, if they are bounded by CPU, memory, I/O, or something else.

Processor / memory / storage ratio

A good way of thinking of this is to look at potential hardware in terms of the CPU / memory / storage ratio. So, for example, a quad-core host with 8 GB memory and 2 TB storage could be thought of as having two cores and 4 GB memory per 1 TB of storage.

Then look at the types of MapReduce jobs you will be running, does that ratio seem appropriate? In other words, does your workload require proportionally more of one of these resources or will a more balanced configuration be sufficient?

This is, of course, best assessed by prototyping and gathering metrics, but that isn't always possible. If not, consider what part of the job is the most expensive. For example, some of the jobs we have seen are I/O bound and read data from the disk, perform simple transformations, and then write results back to the disk. If this was typical of our workload, we could likely use hardware with more storage—especially if it was delivered by multiple disks to increase I/O—and use less CPU and memory.

Conversely, jobs that perform very heavy number crunching would need more CPU, and those that create or use large data structures would benefit from memory.

Think of it in terms of limiting factors. If your job was running, would it be CPU-bound (processors at full capacity; memory and I/O to spare), memory-bound (physical memory full and swapping to disk; CPU and I/O to spare), or I/O-bound (CPU and memory to spare, but data being read/written to/from disk at maximum possible speed)? Can you get hardware that eases that bound?

This is of course a limitless process, as once you ease one bound another will manifest itself. So always remember that the idea is to get a performance profile that makes sense in the context of your likely usage scenario.

What if you really don't know the performance characteristics of your jobs? Ideally, try to find out, do some prototyping on any hardware you have and use that to inform your decision. However, if even that is not possible, you will have to go for a configuration and try it out. Remember that Hadoop supports heterogeneous hardware—though having uniform specifications makes your life easier in the end—so build the cluster to the minimum possible size and assess the hardware. Use this knowledge to inform future decisions regarding additional host purchases or upgrades of the existing fleet.

EMR as a prototyping platform

Recall that when we configured a job on Elastic MapReduce we chose the type of hardware for both the master and data/task nodes. If you plan to run your jobs on EMR, you have a built-in capability to tweak this configuration to find the best combination of hardware specifications to price and execution speed.

However, even if you do not plan to use EMR full-time, it can be a valuable prototyping platform. If you are sizing a cluster but do not know the performance characteristics of your jobs, consider some prototyping on EMR to gain better insight. Though you may end up spending money on the EMR service that you had not planned, this will likely be a lot less than the cost of finding out you have bought completely unsuitable hardware for your cluster.

Special node requirements

Not all hosts have the same hardware requirements. In particular, the host for the NameNode may look radically different to those hosting the DataNodes and TaskTrackers.

Recall that the NameNode holds an in-memory representation of the HDFS filesystem and the relationship between files, directories, blocks, nodes, and various metadata concerning all of this. This means that the NameNode will tend to be memory bound and may require larger memory than any other host, particularly for very large clusters or those with a huge number of files. Though 16 GB may be a common memory size for DataNodes/TaskTrackers, it's not unusual for the NameNode host to have 64 GB or more of memory. If the NameNode ever ran out of physical memory and started to use swap space, the impact on cluster performance would likely be severe.

However, though 64 GB is large for physical memory, it's tiny for modern storage, and given that the filesystem image is the only data stored by the NameNode, we don't need the massive storage common on the DataNode hosts. We care much more about NameNode reliability so are likely to have several disks in a redundant configuration. Consequently, the NameNode host will benefit from multiple small drives (for redundancy) rather than large drives.

Overall, therefore, the NameNode host is likely to look quite different from the other hosts in the cluster; this is why we made the earlier recommendations regarding moving the NameNode to its own host as soon as budget/space allows, as its unique hardware requirements are more easily satisfied this way.

Note

The SecondaryNameNode (or CheckpointNameNode and BackupNameNode in Hadoop 2.0) share the same hardware requirements as the NameNode. You can run it on a more generic host while in its secondary capacity, but if you do ever need to switch and make it the NameNode due to failure of the primary hardware, you may be in trouble.

Storage types

Though you will find strong opinions on some of the previous points regarding the relative importance of processor, memory, and storage capacity, or I/O, such arguments are usually based around application requirements and hardware characteristics and metrics. Once we start discussing the type of storage to be used, however, it is very easy to get into flame war situations, where you will find extremely entrenched opinions.

Commodity versus enterprise class storage

The first argument will be over whether it makes most sense to use hard drives aimed at the commodity/consumer segments or those aimed at enterprise customers. The former (primarily SATA disks) are larger, cheaper, and slower, and have lower quoted figures for mean time between failures (MTBF). Enterprise disks will use technologies such as SAS or Fiber Channel, and will on the whole be smaller, more expensive, faster, and have higher quoted MTBF figures.

Single disk versus RAID

The next question will be on how the disks are configured. The enterprise-class approach would be to use Redundant Arrays of Inexpensive Disks (RAID) to group multiple disks into a single logical storage device that can quietly survive one or more disk failures. This comes with the cost of a loss in overall capacity and an impact on the read/write rates achieved.

The other position is to treat each disk independently to maximize total storage and aggregate I/O, at the cost of a single disk failure causing host downtime.

Finding the balance

The Hadoop architecture is, in many ways, predicated on the assumption that hardware will fail. From this perspective, it is possible to argue that there is no need to use any traditional enterprise-focused storage features. Instead, use many large, cheap disks to maximize the total storage and read and write from them in parallel to do likewise for I/O throughput. A single disk failure may cause the host to fail, but the cluster will, as we have seen, work around this failure.

This is a completely valid argument and in many cases makes perfect sense. What the argument ignores, however, is the cost of bringing a host back into service. If your cluster is in the next room and you have a shelf of spare disks, host recovery will likely be a quick, painless, and inexpensive task. However, if you have your cluster hosted by a commercial collocation facility, any hands-on maintenance may cost a lot more. This is even more the case if you are using fully-managed servers where you have to pay the provider for maintenance tasks. In such a situation, the extra cost and reduced capacity and I/O from using RAID may make sense.

Network storage

One thing that will almost never make sense is to use networked storage for your primary cluster storage. Be it block storage via a Storage Area Network (SAN) or file-based via Network File System (NFS) or similar protocols, these approaches constrain Hadoop by introducing unnecessary bottlenecks and additional shared devices that would have a critical impact on failure.

Sometimes, however, you may be forced for non-technical reasons to use something like this. It's not that it won't work, just that it changes how Hadoop will perform in regards to speed and tolerance to failures, so be sure you understand the consequences if this happens.

Hadoop networking configuration

Hadoop's support of networking devices is not as sophisticated as it is for storage, and consequently you have fewer hardware choices to make compared to CPU, memory, and storage setup. The bottom line is that Hadoop can currently support only one network device and cannot, for example, use all 4-gigabit Ethernet connections on a host for an aggregate of 4-gigabit throughput. If you need network throughput greater than that provided by a single-gigabit port then, unless your hardware or operating system can present multiple ports as a single device to Hadoop, the only option is to use a 10-gigabit Ethernet device.

How blocks are placed

We have talked a lot about HDFS using replication for redundancy, but have not explored how Hadoop chooses where to place the replicas for a block.

In most traditional server farms, the various hosts (as well as networking and other devices) are housed in standard-sized racks that stack the equipment vertically. Each rack will usually have a common power distribution unit that feeds it and will often have a network switch that acts as the interface between the broader network and all the hosts in the rack.

Given this setup, we can identify three broad types of failure:

  • Those that affect a single host (for example, CPU/memory/disk/motherboard failure)
  • Those that affect a single rack (for example, power unit or switch failure)
  • Those that affect the entire cluster (for example, larger power/network failures, cooling/environmental outages)

Note

Remember that Hadoop currently does not support a cluster that is spread across multiple data centers, so instances of the third type of failure will quite likely bring down your cluster.

By default, Hadoop will treat each node as if it is in the same physical rack. This implies that the bandwidth and latency between any pair of hosts is approximately equal and that each node is equally likely to suffer a related failure as any other.

Rack awareness

If, however, you do have a multi-rack setup, or another configuration that otherwise invalidates the previous assumptions, you can add the ability for each node to report its rack ID to Hadoop, which will then take this into account when placing replicas.

In such a setup, Hadoop tries to place the first replica of a node on a given host, the second on another within the same rack, and the third on a host in a different rack.

This strategy provides a good balance between performance and availability. When racks contain their own network switches, communication between hosts inside the rack often has lower latency than that with external hosts. This strategy places two replicas within a rack to ensure maximum speed of writing for these replicas, but keeps one outside the rack to provide redundancy in the event of a rack failure.

The rack-awareness script

If the topology.script.file.name property is set and points to an executable script on the filesystem, it will be used by the NameNode to determine the rack for each host.

Note that the property needs to be set and the script needs to exist only on the NameNode host.

The NameNode will pass to the script the IP address of each node it discovers, so the script is responsible for a mapping from node IP address to rack name.

If no script is specified, each node will be reported as a member of a single default rack.

Time for action – examining the default rack configuration

Let's take a look at how the default rack configuration is set up in our cluster.

  1. Execute the following command:
    $ Hadoop fsck -rack
    
  2. The result should include output similar to the following:
    Default replication factor: 3
    Average block replication: 3.3045976
    Corrupt blocks: 0
    Missing replicas: 18 (0.5217391 %)
    Number of data-nodes: 4
    Number of racks: 1
    The filesystem under path '/' is HEALTHY
    

What just happened?

Both the tool used and its output are of interest here. The tool is hadoop fsck, which can be used to examine and fix filesystem problems. As can be seen, this includes some information not dissimilar to our old friend hadoop dfsadmin, though that tool is focused more on the state of each node in detail while hadoop fsck reports on the internals of the filesystem as a whole.

One of the things it reports is the total number of racks in the cluster, which, as seen in the preceding output, has the value 1, as expected.

Note

This command was executed on a cluster that had recently been used for some HDFS resilience testing. This explains the figures for average block replication and under-replicated blocks.

If a block ends up with more than the required number of replicas due to a host temporarily failing, the host coming back into service will put the block above the minimum replication factor. Along with ensuring that blocks have replicas added to meet the replication factor, Hadoop will also delete excess replicas to return blocks to the replication factor.

Time for action – adding a rack awareness script

We can enhance the default flat rack configuration by creating a script that derives the rack location for each host.

  1. Create a script in the Hadoop user's home directory on the NameNode host called rack-script.sh, containing the following text. Remember to change the IP address to one of your HDFS nodes.
    #!/bin/bash
    
    if [ $1 = "10.0.0.101" ]; then
        echo -n "/rack1 "
    else
        echo -n "/default-rack "
    fi
  2. Make this script executable.
    $ chmod +x rack-script.sh
    
  3. Add the following property to core-site.xml on the NameNode host:
    <property>
    <name>topology.script.file.name</name>
    <value>/home/Hadoop/rack-script.sh</value>
    </property>
  4. Restart HDFS.
    $ start-dfs.sh
    
  5. Check the filesystem via fsck.
    $ Hadoop fsck –rack
    

    The output of the preceding command can be shown in the following screenshot:

What just happened?

We first created a simple script that returns one value for a named node and a default value for all others. We placed this on the NameNode host and added the needed configuration property to the NameNode core-site.xml file.

After starting HDFS, we used hadoop fsck to report on the filesystem and saw that we now have a two-rack cluster. With this knowledge, Hadoop will now employ more sophisticated block placement strategies, as described previously.

Tip

Using an external host file

A common approach is to keep a separate data file akin to the /etc/hosts file on Unix and use this to specify the IP/rack mapping, one per line. This file can then be updated independently and read by the rack-awareness script.

What is commodity hardware anyway?

Let's revisit the question of the general characteristics of the hosts used for your cluster, and whether they should look more like a commodity white box server or something built for a high-end enterprise environment.

Part of the problem is that "commodity" is an ambiguous term. What looks cheap and cheerful for one business may seem luxuriously high-end for another. We suggest considering the following points to keep in mind when selecting hardware and then remaining happy with your decision:

  • With your hardware, are you paying a premium for reliability features that duplicate some of Hadoop's fault-tolerance capabilities?
  • Are the higher-end hardware features you are paying for addressing the need or risk that you have confirmed is realistic in your environment?
  • Have you validated the cost of the higher-end hardware to be higher than dealing with cheaper / less reliable hardware?

Pop quiz – setting up a cluster

Q1. Which of the following is most important when selecting hardware for your new Hadoop cluster?

  1. The number of CPU cores and their speed.
  2. The amount of physical memory.
  3. The amount of storage.
  4. The speed of the storage.
  5. It depends on the most likely workload.

Q2. Why would you likely not want to use network storage in your cluster?

  1. Because it may introduce a new single point of failure.
  2. Because it most likely has approaches to redundancy and fault-tolerance that may be unnecessary given Hadoop's fault tolerance.
  3. Because such a single device may have inferior performance to Hadoop's use of multiple local disks simultaneously.
  4. All of the above.

Q3. You will be processing 10 TB of data on your cluster. Your main MapReduce job processes financial transactions, using them to produce statistical models of behavior and future forecasts. Which of the following hardware choices would be your first choice for the cluster?

  1. 20 hosts each with fast dual-core processors, 4 GB memory, and one 500 GB disk drive.
  2. 30 hosts each with fast dual-core processors, 8 GB memory, and two 500 GB disk drives.
  3. 30 hosts each with fast quad-core processors, 8 GB memory, and one 1 TB disk drive.
  4. 40 hosts each with 16 GB memory, fast quad-core processors, and four 1 TB disk drives.

Cluster access control

Once you have the shiny new cluster up and running, you need to consider questions of access and security. Who can access the data on the cluster—is there sensitive data that you really don't want the whole user base to see?

The Hadoop security model

Until very recently, Hadoop had a security model that could, at best, be described as "marking only". It associated an owner and group with each file but, as we'll see, did very little validation of a given client connection. Strong security would manage not only the markings given to a file but also the identities of all connecting users.

Time for action – demonstrating the default security

When we have previously shown listings of files, we have seen user and group names for them. However, we have not really explored what that means. Let's do so.

  1. Create a test text file in the Hadoop user's home directory.
    $ echo "I can read this!" > security-test.txt 
    $ hadoop fs -put security-test.txt security-test.txt 
    
  2. Change the permissions on the file to be accessible only by the owner.
    $ hadoop fs -chmod 700 security-test.txt 
    $ hadoop fs -ls
    

    The output of the preceding command can be shown in the following screenshot:

  3. Confirm you can still read the file.
    $ hadoop fs -cat security-test.txt 
    

    You'll see the following line on the screen:

    I can read this!
    
  4. Connect to another node in the cluster and try to read the file from there.
    $ ssh node2
    $ hadoop fs -cat security-test.txt 
    

    You'll see the following line on the screen:

    I can read this!
    
  5. Log out from the other node.
    $ exit
    
  6. Create a home directory for another user and give them ownership.
    $ hadoop m[Kfs -mkdir /user/garry
    $ hadoop fs -chown garry /user/garry
    $ hadoop fs -ls /user
    

    The output of the preceding command can be shown in the following screenshot:

  7. Switch to that user.
    $ su garry
    
  8. Try to read the test file in the Hadoop user's home directory.
    $ hadoop/bin/hadoop fs -cat /user/hadoop/security-test.txt
    cat: org.apache.hadoop.security.AccessControlException: Permission denied: user=garry, access=READ, inode="security-test.txt":hadoop:supergroup:rw-------
    
  9. Place a copy of the file in this user's home directory and again make it accessible only by the owner.
    $ Hadoop/bin/Hadoop fs -put security-test.txt security-test.txt
    $ Hadoop/bin/Hadoop fs -chmod 700 security-test.txt
    $ hadoop/bin/hadoop fs -ls 
    

    The output of the preceding command can be shown in following screenshot:

  10. Confirm this user can access the file.
    $ hadoop/bin/hadoop fs -cat security-test.txt 
    

    You'll see the following line on the screen:

    I can read this!
    
  11. Return to the Hadoop user.
    $ exit
    
  12. Try and read the file in the other user's home directory.
    $ hadoop fs -cat /user/garry/security-test.txt
    

    You'll see the following line on the screen:

    I can read this!
    

What just happened?

We firstly used our Hadoop user to create a test file in its home directory on HDFS. We used the -chmod option to hadoop fs, which we have not seen before. This is very similar to the standard Unix chmod tool that gives various levels of read/write/execute access to the file owner, group members, and all users.

We then went to another host and tried to access the file, again as the Hadoop user. Not surprisingly, this worked. But why? What did Hadoop know about the Hadoop user that allowed it to give access to the file?

To explore this, we then created another home directory on HDFS (you can use any other account on the host you have access to), and gave it ownership by using the -chown option to hadoop fs. This should once again look similar to standard Unix -chown. Then we switched to this user and attempted to read the file stored in the Hadoop user's home directory. This failed with the security exception shown before, which is again what we expected. Once again, we copied a test file into this user's home directory and made it only accessible by the owner.

But we then muddied the waters by switching back to the Hadoop user and tried to access the file in the other account's home directory, which, surprisingly, worked.

User identity

The answer to the first part of the puzzle is that Hadoop uses the Unix ID of the user executing the HDFS command as the user identity on HDFS. So any commands executed by a user called alice will create files with an owner named alice and will only be able to read or write files to which this user has the correct access.

The security-minded will realize that to access a Hadoop cluster all one needs to do is create a user with the same name as an already existing HDFS user on any host that can connect to the cluster. So, for instance, in the previous example, any user named hadoop created on any host that can access the NameNode can read all files accessible by the user hadoop, which is actually even worse than it seems.

The super user

The previous step saw the Hadoop user access another user's files. Hadoop treats the user ID that started the cluster as the super user, and gives it various privileges, such as the ability to read, write, and modify any file on HDFS. The security-minded will realize even more the risk of having users called hadoop randomly created on hosts outside the Hadoop administrator's control.

More granular access control

The preceding situation has caused security to be a major weakness in Hadoop since its inception. The community has, however, not been standing still, and after much work the very latest versions of Hadoop support a more granular and stronger security model.

To avoid reliance on simple user IDs, the developers need to learn the user identity from somewhere, and the Kerberos system was chosen with which to integrate. This does require the establishment and maintenance of services outside the scope of this book, but if such security is important to you, consult the Hadoop documentation. Note that this support does allow integration with third-party identity systems such as Microsoft Active Directory, so it is quite powerful.

Working around the security model via physical access control

If the burden of Kerberos is too great, or security is a nice-to-have rather than an absolute, there are ways of mitigating the risk. One favored by me is to place the entire cluster behind a firewall with tight access control. In particular, only allow access to the NameNode and JobTracker services from a single host that will be treated as the cluster head node and to which all users connect.

Tip

Accessing Hadoop from non-cluster hosts

Hadoop does not need to be running on a host for it to use the command-line tools to access HDFS and run MapReduce jobs. As long as Hadoop is installed on the host and its configuration files have the correct locations of the NameNode and JobTracker, these will be found when invoking commands such as Hadoop fs and Hadoop jar.

This model works because only one host is used to interact with Hadoop; and since this host is controlled by the cluster administrator, normal users should be unable to create or access other user accounts.

Remember that this approach is not providing security. It is putting a hard shell around a soft system that reduces the ways in which the Hadoop security model can be subverted.

Managing the NameNode

Let's do some more risk reduction. In piece of data on the cluster. This is because the NameNode writes a file called fsimage that contains all the metadata for the filesystem and records which blocks comprise which files. If the loss of the NameNode host makes the fsimage unrecoverable, all the HDFS data is likewise lost.

Configuring multiple locations for the fsimage class

The NameNode can be configured to simultaneously write fsimage to multiple locations. This is purely a redundancy mechanism, the same data is written to each location and there is no attempt to use multiple storage devices for increased performance. Instead, the policy is that mltiple copies of fsimage will be harder to lose.

Time for action – adding an additional fsimage location

Let's now configure our NameNode to simultaneously write multiple copies of fsimage to give us our desired data resilience. To do this, we require an NFS-exported directory.

  1. Ensure the cluster is stopped.
    $ stopall.sh
    
  2. Add the following property to Hadoop/conf/core-site.xml, modifying the second path to point to an NFS-mounted location to which the additional copy of NameNode data can be written.
    <property>
    <name>dfs.name.dir</name>
    <value>${hadoop.tmp.dir}/dfs/name,/share/backup/namenode</value>
    </property>
  3. Delete any existing contents of the newly added directory.
    $ rm -f /share/backup/namenode
    
  4. Start the cluster.
    $ start-all.sh
    
  5. Verify that fsimage is being written to both the specified locations by running the md5sum command against the two files specified before (change the following code depending on your configured locations):
    $ md5sum /var/hadoop/dfs/name/image/fsimage
    a25432981b0ecd6b70da647e9b94304a /var/hadoop/dfs/name/image/fsimage
    $ md5sum /share/backup/namenode/image/fsimage
    a25432981b0ecd6b70da647e9b94304a /share/backup/namenode/image/fsimage
    

What just happened?

Firstly, we ensured the cluster was stopped; though changes to the core configuration files are not reread by a running cluster, it's a good habit to get into in case that capability is ever added to Hadoop.

We then added a new property to our cluster configuration, specifying a value for the data.name.dir property. This property takes a list of comma-separated values and writes fsimage to each of these locations. Note how the hadoop.tmp.dir property discussed earlier is de-referenced, as would be seen when using Unix variables. This syntax allows us to base property values on others and inherit changes when the parent properties are updated.

Tip

Do not forget all required locations

The default value for this property is ${Hadoop.tmp.dir}/dfs/name. When adding an additional value, remember to explicitly add the default one also, as shown before. Otherwise, only the single new value will be used for the property.

Before starting the cluster, we ensure the new directory exists and is empty. If the directory doesn't exist, the NameNode will fail to start as should be expected. If, however, the directory was previously used to store NameNode data, Hadoop will also fail to start as it will identify that both directories contain different NameNode data and it does not know which one is correct.

Be careful here! Especially if you are experimenting with various NameNode data locations or swapping back and forth between nodes; you really do not want to accidentally delete the contents from the wrong directory.

After starting the HDFS cluster, we wait for a moment and then use MD5 cryptographic checksums to verify that both locations contain the identical fsimage.

Where to write the fsimage copies

The recommendation is to write fsimage to at least two locations, one of which should be the remote (such as a NFS) filesystem, as in the previous example. fsimage is only updated periodically, so the filesystem does not need high performance.

In our earlier discussion regarding the choice of hardware, we alluded to other considerations for the NameNode host. Because of fsimage criticality, it may be useful to ensure it is written to more than one disk and to perhaps invest in disks with higher reliability, or even to write fsimage to a RAID array. If the host fails, using the copy written to the remote filesystem will be the easiest option; but just in case that has also experienced problems, it's good to have the choice of pulling another disk from the dead host and using it on another to recover the data.

Swapping to another NameNode host

We have ensured that fsimage is written to multiple locations and this is the single most important prerequisite for managing a swap to a different NameNode host. Now we need to actually do it.

This is something you really should not do on a production cluster. Absolutely not when trying for the first time, but even beyond that it's not a risk-free process. But do practice on other clusters and get an idea of what you'll do when disaster strikes.

Having things ready before disaster strikes

You don't want to be exploring this topic for the first time when you need to recover the production cluster. There are several things to do in advance that will make disaster recovery much less painful, not to mention possible:

  • Ensure the NameNode is writing the fsimage to multiple locations, as done before.
  • Decide which host will be the new NameNode location. If this is a host currently being used for a DataNode and TaskTracker, ensure it has the right hardware needed to host the NameNode and that the reduction in cluster performance due to the loss of these workers won't be too great.
  • Make a copy of the core-site.xml and hdfs-site.xml files, place them (ideally) on an NFS location, and update them to point to the new host. Any time you modify the current configuration files, remember to make the same changes to these copies.
  • Copy the slaves file from the NameNode onto either the new host or the NFS share. Also, make sure you keep it updated.
  • Know how you will handle a subsequent failure in the new host. How quickly can you likely repair or replace the original failed host? Which host will be the location of the NameNode (and SecondaryNameNode) in the interim?

Ready? Let's do it!

Time for action – swapping to a new NameNode host

In the following steps we keep the new configuration files on an NFS share mounted to /share/backup and change the paths to match where you have the new files. Also use a different string to grep; we use a portion of the IP address we know isn't shared with any other host in the cluster.

  1. Log on to the current NameNode host and shut down the cluster.
    $ stop-all.sh
    
  2. Halt the host that runs the NameNode.
    $ sudo poweroff
    
  3. Log on to the new NameNode host and confirm the new configuration files have the correct NameNode location.
    $ grep 110 /share/backup/*.xml
    
  4. On the new host, first copy across the slaves file.
    $ cp /share/backup/slaves Hadoop/conf
    
  5. Now copy across the updated configuration files.
    $ cp /share/backup/*site.xml Hadoop/conf
    
  6. Remove any old NameNode data from the local filesystem.
    $ rm -f /var/Hadoop/dfs/name/*
    
  7. Copy the updated configuration files to every node in the cluster.
    $ slaves.sh cp /share/backup/*site.xml Hadoop/conf
    
  8. Ensure each node now has the configuration files pointing to the new NameNode.
    $ slaves.sh grep 110 hadoop/conf/*site.xml
    
  9. Start the cluster.
    $ start-all.sh
    
  10. Check HDFS is healthy, from the command line.
    $ Hadoop fs ls /
    
  11. Verify whether HDFS is accessible from the web UI.

What just happened?

First, we shut down the cluster. This is a little un-representative as most failures see the NameNode die in a much less friendly way, but we do not want to talk about issues of filesystem corruption until later in the chapter.

We then shut down the old NameNode host. Though not strictly necessary, it is a good way of ensuring that nothing accesses the old host and gives you an incorrect view on how well the migration has occurred.

Before copying across files, we take a quick look at core-site.xml and hdfs-site.xml to ensure the correct values are specified for the fs.default.dir property in core-site.xml.

We then prepare the new host by firstly copying across the slaves configuration file and the cluster configuration files and then removing any old NameNode data from the local directory. Refer to the preceding steps about being very careful in this step.

Next, we use the slaves.sh script to get each host in the cluster to copy across the new configuration files. We know our new NameNode host is the only one with 110 in its IP address, so we grep for that in the files to ensure all are up-to-date (obviously, you will need to use a different pattern for your system).

At this stage, all should be well; we start the cluster and access via both the command-line tools and UI to confirm it is running as expected.

Don't celebrate quite yet!

Remember that even with a successful migration to a new NameNode, you aren't done quite yet. You decided in advance how to handle the SecondaryNameNode and which host would be the new designated NameNode host should the newly migrated one fail. To be ready for that, you will need to run through the "Be prepared" checklist mentioned before once more and act appropriately.

Note

Do not forget to consider the chance of correlated failures. Investigate the cause of the NameNode host failure in case it is the start of a bigger problem.

What about MapReduce?

We did not mention moving the JobTracker as that is a much less painful process as shown in .

Have a go hero – swapping to a new NameNode host

Perform a migration of both the NameNode and JobTracker from one host to another.

Managing HDFS

As we saw when killing and restarting nodes in Chapter 6, When Things Break, Hadoop automatically manages many of the availability concerns that would consume a lot of effort on a more traditional filesystem. There are some things, however, that we still need to be aware of.

Where to write data

Just as the NameNode can have multiple locations for storage of fsimage specified via the dfs.name.dir property, we explored earlier that there is a similar-appearing property called dfs.data.dir that allows HDFS to use multiple data locations on a host, which we will look at now.

This is a useful mechanism that works very differently from the NameNode property. If multiple directories are specified in dfs.data.dir, Hadoop will view these as a series of independent locations that it can use in parallel. This is useful if you have multiple physical disks or other storage devices mounted at distinct points on the filesystem. Hadoop will use these multiple devices intelligently, maximizing not only the total storage capacity but also by balancing reads and writes across the locations to gain maximum throughput. As mentioned in the Storage types section, this is the approach that maximizes these factors at the cost of a single disk failure causing the whole host to fail.

Using balancer

Hadoop works hard to place data blocks on HDFS in a way that maximizes both performance and redundancy. However, in certain situations, the cluster can become unbalanced, with a large discrepancy between the data held on the various nodes. The classic situation that causes this is when a new node is added to the cluster. By default, Hadoop will consider the new node as a candidate for block placement alongside all other nodes, meaning that it will remain lightly utilized for a significant period of time. Nodes that have been out of service or have otherwise suffered issues may also have collected a smaller number of blocks than their peers.

Hadoop includes a tool called the balancer, started and stopped by the start-balancer.sh and stop-balancer.sh scripts respectively, to handle this situation.

When to rebalance

Hadoop does not have any automatic alarms that will alert you to an unbalanced filesystem. Instead, you need to keep an eye on the data reported by both hadoop fsck and hadoop fsadmin and watch for imbalances across the nodes.

In reality, this is not something you usually need to worry about, as Hadoop is very good at managing block placement and you likely only need to consider running the balancer to remove major imbalances when adding new hardware or when returning faulty nodes to service. To maintain maximum cluster health, however, it is not uncommon to have the balancer run on a scheduled basis (for example, nightly) to keep the block balancing within a specified threshold.

MapReduce management

As we saw in the previous chapter, the MapReduce framework is generally more tolerant of problems and failures than HDFS. The JobTracker and TaskTrackers have no persistent data to manage and, consequently, the management of MapReduce is more about the handling of running jobs and tasks than servicing the framework itself.

Command line job management

The hadoop job command-line tool is the primary interface for this job management. As usual, type the following to get a usage summary:

$ hadoop job --help

The options to the command are generally self-explanatory; it allows you to start, stop, list, and modify running jobs in addition to retrieving some elements of job history. Instead of examining each individually, we will explore the use of several of these subcommands together in the next section.

Have a go hero – command line job management

The MapReduce UI also provides access to a subset of these capabilities. Explore the UI and see what you can and cannot do from the web interface.

Job priorities and scheduling

So far, we have generally run a single job against our cluster and waited for it to complete. This has hidden the fact that, by default, Hadoop places subsequent job submissions into a First In, First Out (FIFO) queue. When a job finishes, Hadoop simply starts executing the next job in the queue. Unless we use one of the alternative schedulers that we will discuss in later sections, the FIFO scheduler dedicates the full cluster to the sole currently running job.

For small clusters with a pattern of job submission that rarely sees jobs waiting in the queue, this is completely fine. However, if jobs are often waiting in the queue, issues can arise. In particular, the FIFO model takes no account of job priority or resources needed. A long-running but low-priority job will execute before faster high-priority jobs that were submitted later.

To address this situation, Hadoop defines five levels of job priority: VERY_HIGH, HIGH, NORMAL, LOW, and VERY_LOW. A job defaults to NORMAL priority, but this can be changed with the hadoop job -set-priority command.

Time for action – changing job priorities and killing a job

Let's explore job priorities by changing them dynamically and watching the result of killing a job.

  1. Start a relatively long-running job on the cluster.
    $ hadoop jar hadoop-examples-1.0.4.jar pi 100 1000
    
  2. Open another window and submit a second job.
    $ hadoop jar hadoop-examples-1.0.4.jar wordcount test.txt out1
    
  3. Open another window and submit a third.
    $ hadoop jar hadoop-examples-1.0.4.jar wordcount test.txt out2
    
  4. List the running jobs.
    $ Hadoop job -list
    

    You'll see the following lines on the screen:

    3 jobs currently running
    JobId State StartTime UserName Priority SchedulingInfo
    job_201201111540_0005 1 1326325810671 hadoop NORMAL NA
    job_201201111540_0006 1 1326325938781 hadoop NORMAL NA
    job_201201111540_0007 1 1326325961700 hadoop NORMAL NA
    
  5. Check the status of the running job.
    $ Hadoop job -status job_201201111540_0005
    

    You'll see the following lines on the screen:

    Job: job_201201111540_0005
    file: hdfs://head:9000/var/hadoop/mapred/system/job_201201111540_0005/job.xml
    tracking URL: http://head:50030/jobdetails.jsp?jobid=job_201201111540_000
    map() completion: 1.0
    reduce() completion: 0.32666665
    Counters: 18
    
  6. Raise the priority of the last submitted job to VERY_HIGH.
    $ Hadoop job -set-priority job_201201111540_0007 VERY_HIGH
    
  7. Kill the currently running job.
    $ Hadoop job -kill job_201201111540_0005
    
  8. Watch the other jobs to see which begins processing.

What just happened?

We started a job on the cluster and then queued up another two jobs, confirming that the queued jobs were in the expected order by using hadoop job -list. The hadoop job -list all command would have listed both completed as well as the current jobs and hadoop job -history would have allowed us to examine the jobs and their tasks in much more detail. To confirm the submitted job was running, we used hadoop job -status to get the current map and reduce task completion status for the job, in addition to the job counters.

We then used hadoop job -set-priority to increase the priority of the job currently last in the queue.

After using hadoop job -kill to abort the currently running job, we confirmed the job with the increased priority that executed next, even though the job remaining in the queue was submitted beforehand.

Alternative schedulers

Manually modifying job priorities in the FIFO queue certainly does work, but it requires active monitoring and management of the job queue. If we think about the problem, the reason we are having this difficulty is the fact that Hadoop dedicates the entire cluster to each job being executed.

Hadoop offers two additional job schedulers that take a different approach and share the cluster among multiple concurrently executing jobs. There is also a plugin mechanism by which additional schedulers can be added. Note that this type of resource sharing is one of those problems that is conceptually simple but is in reality very complex and is an area of much academic research. The goal is to maximize resource allocation not only at a point in time, but also over an extended period while honoring notions of relative priority.

Capacity Scheduler

The Capacity Scheduler uses multiple job queues (to which access control can be applied) to which jobs are submitted, each of which is allocated a portion of the cluster resources. You could, for example, have a queue for large long-running jobs that is allocated 90 percent of the cluster and one for smaller high-priority jobs allocated the remaining 10 percent. If both queues have jobs submitted, the cluster resources will be allocated in this proportion.

If, however, one queue is empty and the other has jobs to execute, the Capacity Scheduler will temporarily allocate the capacity of the empty queue to the busy one. Once a job is submitted to the empty queue, it will regain its capacity as the currently running tasks complete execution. This approach gives a reasonable balance between the desired resource allocation and preventing long periods of unused capacity.

Though disabled by default, the Capacity Scheduler supports job priorities within each queue. If a high priority job is submitted after a low priority one, its tasks will be scheduled in preference to the other jobs as capacity becomes available.

Fair Scheduler

The Fair Scheduler segments the cluster into pools into which jobs are submitted; there is often a correlation between the user and the pool. Though by default each pool gets an equal share of the cluster, this can be modified.

Within each pool, the default model is to share the pool across all jobs submitted to that pool. Therefore, if the cluster is split into pools for Alice and Bob, each of whom submit three jobs, the cluster will execute all six jobs in parallel. It is possible to place total limits on the number of concurrent jobs running in a pool, as too many running at once will potentially produce a large amount of temporary data and provide overall inefficient processing.

As with the Capacity Scheduler, the Fair Scheduler will over-allocate cluster capacity to other pools if one is empty, and then reclaim it as the pool receives jobs. It also supports job priorities within a pool to preferentially schedule tasks of high priority jobs over those with a lower priority.

Enabling alternative schedulers

Each of the alternative schedulers is provided as a JAR file in capacityScheduler and fairScheduler directories within the contrib directory in the Hadoop installation. To enable a scheduler, either add its JAR to the hadoop/lib directory or explicitly place it on the classpath. Note that each scheduler requires its own set of properties to configure its usage. Refer to the documentation for each for more details.

When to use alternative schedulers

The alternative schedulers are very effective, but are not really needed on small clusters or those with no need to ensure multiple job concurrency or execution of late-arriving but high-priority jobs. Each has multiple configuration parameters and requires tuning to get optimal cluster utilization. But for any large cluster with multiple users and varying job priorities, they can be essential.

Scaling

You have data and you have a running Hadoop cluster; now you get more of the former and need more of the latter. We have said repeatedly that Hadoop is an easily scalable system. So let us add some new capacity.

Adding capacity to a local Hadoop cluster

Hopefully, at this point, you should feel pretty underwhelmed at the idea of adding another node to a running cluster. All through Chapter 6, When Things Break, we constantly killed and restarted nodes. Adding a new node is really no different; all you need to do is perform the following steps:

  1. Install Hadoop on the host.
  2. Set the environment variables shown in Chapter 2, Getting Up and Running.
  3. Copy the configuration files into the conf directory on the installation.
  4. Add the host's DNS name or IP address to the slaves file on the node from which you usually run commands such as slaves.sh or cluster start/stop scripts.

And that's it!

Have a go hero – adding a node and running balancer

Try out the process of adding a new node and afterwards examine the state of HDFS. If it is unbalanced, run the balancer to fix things. To help maximize the effect, ensure there is a reasonable amount of data on HDFS before adding the new node.

Adding capacity to an EMR job flow

If you are using Elastic MapReduce, for non-persistent clusters, the concept of scaling does not always apply. Since you specify the number and type of hosts required when setting up the job flow each time, you need only ensure that the cluster size is appropriate for the job to be executed.

Expanding a running job flow

However, sometimes you may have a long-running job that you want to complete more quickly. In such a case, you can add more nodes to the running job flow. Recall that EMR has three different types of node: master nodes for NameNode and JobTracker, core nodes for HDFS, and task nodes for MapReduce workers. In this case, you could add additional task nodes to help crunch the MapReduce job.

Another scenario is where you have defined a job flow comprising a series of MapReduce jobs instead of just one. EMR now allows the job flow to be modified between steps in such a series. This has the advantage of each job being given a tailored hardware configuration that gives better control of balancing performance against cost.

The canonical model for EMR is for the job flow to pull its source data from S3, process that data on a temporary EMR Hadoop cluster, and then write results back to S3. If, however, you have a very large data set that requires frequent processing, the copying back and forth of data could become too time-consuming. Another model that can be employed in such a situation is to use a persistent Hadoop cluster within a job flow that has been sized with enough core nodes to store the needed data on HDFS. When processing is performed, increase capacity as shown before by assigning more task nodes to the job flow.

Note

These tasks to resize running job flows are not currently available from the AWS Console and need to be performed through the API or command line tools.

Summary

This chapter covered how to build, maintain, and expand a Hadoop cluster. In particular, we learned where to find the default values for Hadoop configuration properties and how to set them programmatically on a per-job level. We learned how to choose hardware for a cluster and the value in understanding your likely workload before committing to purchases, and how Hadoop can use awareness of the physical location of hosts to optimize its block placement strategy through the use of rack awareness.

We then saw how the default Hadoop security model works, its weaknesses and how to mitigate them, how to mitigate the risks of NameNode failure we introduced in Chapter 6, When Things Break, and how to swap to a new NameNode host if disaster strikes. We learned more about block replica placement, how the cluster can become unbalanced, and what to do if it does.

We also saw the Hadoop model for MapReduce job scheduling and learned how job priorities can modify the behavior, how the Capacity Scheduler and Fair Scheduler give a more sophisticated way of managing cluster resources across multiple concurrent job submissions, and how to expand a cluster with a new capacity.

This completes our exploration of core Hadoop in this book. In the remaining chapters, we will look at other systems and tools that build atop Hadoop to provide more sophisticated views on data and integration with other systems. We will start with a relational view on the data in HDFS through the use of Hive.