Hadoop:Data Processing and Modelling
上QQ阅读APP看书,第一时间看更新

Chapter 6. When Things Break

One of the main promises of Hadoop is resilience to failure and an ability to survive failures when they do happen. Tolerance to failure will be the focus of this chapter.

In particular, we will cover the following topics:

  • How Hadoop handles failures of DataNodes and TaskTrackers
  • How Hadoop handles failures of the NameNode and JobTracker
  • The impact of hardware failure on Hadoop
  • How to deal with task failures caused by software bugs
  • How dirty data can cause tasks to fail and what to do about it

Along the way, we will deepen our understanding of how the various components of Hadoop fit together and identify some areas of best practice.

Failure

With many technologies, the steps to be taken when things go wrong are rarely covered in much of the documentation and are often treated as topics only of interest to the experts. With Hadoop, it is much more front and center; much of the architecture and design of Hadoop is predicated on executing in an environment where failures are both frequent and expected.

Embrace failure

In recent years, a different mindset than the traditional one has been described by the term embrace failure. Instead of hoping that failure does not happen, accept the fact that it will and know how your systems and processes will respond when it does.

Or at least don't fear it

That's possibly a stretch, so instead, our goal in this chapter is to make you feel more comfortable about failures in the system. We'll be killing the processes of a running cluster, intentionally causing the software to fail, pushing bad data into our jobs, and generally causing as much disruption as we can.

Don't try this at home

Often when trying to break a system, a test instance is abused, leaving the operational system protected from the disruption. We will not advocate doing the things given in this chapter to an operational Hadoop cluster, but the fact is that apart from one or two very specific cases, you could. The goal is to understand the impact of the various types of failures so that when they do happen on the business-critical system, you will know whether it is a problem or not. Fortunately, the majority of cases are handled for you by Hadoop.

Types of failure

We will generally categorize failures into the following five types:

  • Failure of a node, that is, DataNode or TaskTracker process
  • Failure of a cluster's masters, that is, NameNode or JobTracker process
  • Failure of hardware, that is, host crash, hard drive failure, and so on
  • Failure of individual tasks within a MapReduce job due to software errors
  • Failure of individual tasks within a MapReduce job due to data problems

We will explore each of these in turn in the following sections.

Hadoop node failure

The first class of failure that we will explore is the unexpected termination of the individual DataNode and TaskTracker processes. Given Hadoop's claims of managing system availability through survival of failures on its commodity hardware, we can expect this area to be very solid. Indeed, as clusters grow to hundreds or thousands of hosts, failures of individual nodes are likely to become quite commonplace.

Before we start killing things, let's introduce a new tool and set up the cluster properly.

The dfsadmin command

As an alternative tool to constantly viewing the HDFS web UI to determine the cluster status, we will use the dfsadmin command-line tool:

$ Hadoop dfsadmin 

This will give a list of the various options the command can take; for our purposes we'll be using the -report option. This gives an overview of the overall cluster state, including configured capacity, nodes, and files as well as specific details about each configured node.

Cluster setup, test files, and block sizes

We will need a fully distributed cluster for the following activities; refer to the setup instructions given earlier in the book. The screenshots and examples that follow use a cluster of one host for the JobTracker and NameNode and four slave nodes for running the DataNode and TaskTracker processes.

Tip

Remember that you don't need physical hardware for each node, we use virtual machines for our cluster.

In normal usage, 64 MB is the usual configured block size for a Hadoop cluster. For our testing purposes, that is terribly inconvenient as we'll need pretty large files to get meaningful block counts across our multinode cluster.

What we can do is reduce the configured block size; in this case, we will use 4 MB. Make the following modifications to the hdfs-site.xml file within the Hadoop conf directory:

<property>
<name>dfs.block.size</name>
<value>4194304</value>
;</property>
<property>
<name>dfs.namenode.logging.level</name>
<value>all</value>
</property>

The first property makes the required changes to the block size and the second one increases the NameNode logging level to make some of the block operations more visible.

Note

Both these settings are appropriate for this test setup but would rarely be seen on a production cluster. Though the higher NameNode logging may be required if a particularly difficult problem is being investigated, it is highly unlikely you would ever want a block size as small as 4 MB. Though the smaller block size will work fine, it will impact Hadoop's efficiency.

We also need a reasonably-sized test file that will comprise of multiple 4 MB blocks. We won't actually be using the content of the file, so the type of file is irrelevant. But you should copy the largest file you can onto HDFS for the following sections. We used a CD ISO image:

$ Hadoop fs –put cd.iso file1.data

Fault tolerance and Elastic MapReduce

The examples in this book are for a local Hadoop cluster because this allows some of the failure mode details to be more explicit. EMR provides exactly the same failure tolerance as the local cluster, so the failure scenarios described here apply equally to a local Hadoop cluster and the one hosted by EMR.

Time for action – killing a DataNode process

Firstly, we'll kill a DataNode. Recall that the DataNode process runs on each host in the HDFS cluster and is responsible for the management of blocks within the HDFS filesystem. Because Hadoop, by default, uses a replication factor of 3 for blocks, we should expect a single DataNode failure to have no direct impact on availability, rather it will result in some blocks temporarily falling below the replication threshold. Execute the following steps to kill a DataNode process:

  1. Firstly, check on the original status of the cluster and check whether everything is healthy. We'll use the dfsadmin command for this:
    $ Hadoop dfsadmin -report
    Configured Capacity: 81376493568 (75.79 GB)
    Present Capacity: 61117323920 (56.92 GB)
    DFS Remaining: 59576766464 (55.49 GB)
    DFS Used: 1540557456 (1.43 GB)
    DFS Used%: 2.52%
    Under replicated blocks: 0
    Blocks with corrupt replicas: 0
    Missing blocks: 0
    -------------------------------------------------
    Datanodes available: 4 (4 total, 0 dead)
    Name: 10.0.0.102:50010
    Decommission Status : Normal
    Configured Capacity: 20344123392 (18.95 GB)
    DFS Used: 403606906 (384.91 MB)
    Non DFS Used: 5063119494 (4.72 GB)
    DFS Remaining: 14877396992(13.86 GB)
    DFS Used%: 1.98%
    DFS Remaining%: 73.13%
    Last contact: Sun Dec 04 15:16:27 PST 2011
    
    

    Now log onto one of the nodes and use the jps command to determine the process ID of the DataNode process:

    $ jps
    2085 TaskTracker
    2109 Jps
    1928 DataNode
    
  2. Use the process ID (PID) of the DataNode process and kill it:
    $ kill -9 1928
    
  3. Check that the DataNode process is no longer running on the host:
    $ jps
    2085 TaskTracker
    
  4. Check the status of the cluster again by using the dfsadmin command:
    $ Hadoop dfsadmin -report
    Configured Capacity: 81376493568 (75.79 GB)
    Present Capacity: 61117323920 (56.92 GB)
    DFS Remaining: 59576766464 (55.49 GB)
    DFS Used: 1540557456 (1.43 GB)
    DFS Used%: 2.52%
    Under replicated blocks: 0
    Blocks with corrupt replicas: 0
    Missing blocks: 0
    -------------------------------------------------
    Datanodes available: 4 (4 total, 0 dead)
    
    
  5. The key lines to watch are the lines reporting on blocks, live nodes, and the last contact time for each node. Once the last contact time for the dead node is around 10 minutes, use the command more frequently until the block and live node values change:
    $ Hadoop dfsadmin -report
    Configured Capacity: 61032370176 (56.84 GB)
    Present Capacity: 46030327050 (42.87 GB)
    DFS Remaining: 44520288256 (41.46 GB)
    DFS Used: 1510038794 (1.41 GB)
    DFS Used%: 3.28%
    Under replicated blocks: 12
    Blocks with corrupt replicas: 0
    Missing blocks: 0
    -------------------------------------------------
    Datanodes available: 3 (4 total, 1 dead)
    
    
  6. Repeat the process until the count of under-replicated blocks is once again 0:
    $ Hadoop dfsadmin -report
    
    Under replicated blocks: 0
    Blocks with corrupt replicas: 0
    Missing blocks: 0
    -------------------------------------------------
    Datanodes available: 3 (4 total, 1 dead)
    
    

What just happened?

The high-level story is pretty straightforward; Hadoop recognized the loss of a node and worked around the problem. However, quite a lot is going on to make that happen.

When we killed the DataNode process, the process on that host was no longer available to serve or receive data blocks as part of the read/write operations. However, we were not actually accessing the filesystem at the time, so how did the NameNode process know this particular DataNode was dead?

NameNode and DataNode communication

The answer lies in the constant communication between the NameNode and DataNode processes that we have alluded to once or twice but never really explained. This occurs through a constant series of heartbeat messages from the DataNode reporting on its current state and the blocks it holds. In return, the NameNode gives instructions to the DataNode, such as notification of the creation of a new file or an instruction to retrieve a block from another node.

It all begins when the NameNode process starts up and begins receiving status messages from the DataNode. Recall that each DataNode knows the location of its NameNode and will continuously send status reports. These messages list the blocks held by each DataNode and from this, the NameNode is able to construct a complete mapping that allows it to relate files and directories to the blocks from where they are comprised and the nodes on which they are stored.

The NameNode process monitors the last time it received a heartbeat from each DataNode and after a threshold is reached, it assumes the DataNode is no longer functional and marks it as dead.

Note

The exact threshold after which a DataNode is assumed to be dead is not configurable as a single HDFS property. Instead, it is calculated from several other properties such as defining the heartbeat interval. As we'll see later, things are a little easier in the MapReduce world as the timeout for TaskTrackers is controlled by a single configuration property.

Once a DataNode is marked as dead, the NameNode process determines the blocks which were held on that node and have now fallen below their replication target. In the default case, each block held on the killed node would have been one of the three replicas, so each block for which the node held a replica will now have only two replicas across the cluster.

In the preceding example, we captured the state when 12 blocks were still under-replicated, that is they did not have enough replicas across the cluster to meet the replication target. When the NameNode process determines the under-replicated blocks, it assigns other DataNodes to copy these blocks from the hosts where the existing replicas reside. In this case we only had to re-replicate a very small number of blocks; in a live cluster, the failure of a node can result in a period of high network traffic as the affected blocks are brought up to their replication factor.

Note that if a failed node returns to the cluster, we have the situation of blocks having more than the required number of replicas; in such a case the NameNode process will send instructions to remove the surplus replicas. The specific replica to be deleted is chosen randomly, so the result will be that the returned node will end up retaining some of its blocks and deleting the others.

Have a go hero – NameNode log delving

We configured the NameNode process to log all its activities. Have a look through these very verbose logs and attempt to identify the replication requests being sent.

The final output shows the status after the under-replicated blocks have been copied to the live nodes. The cluster is down to only three live nodes but there are no under-replicated blocks.

Tip

A quick way to restart the dead nodes across all hosts is to use the start-all.sh script. It will attempt to start everything but is smart enough to detect the running services, which means you get the dead nodes restarted without the risk of duplicates.

Time for action – the replication factor in action

Let's repeat the preceding process, but this time, kill two DataNodes out of our cluster of four. We will give an abbreviated walk-through of the activity as it is very similar to the previous Time for action section:

  1. Restart the dead DataNode and monitor the cluster until all nodes are marked as live.
  2. Pick two DataNodes, use the process ID, and kill the DataNode processes.
  3. As done previously, wait for around 10 minutes then actively monitor the cluster state via dfsadmin, paying particular attention to the reported number of under-replicated blocks.
  4. Wait until the cluster has stabilized with an output similar to the following:
    Configured Capacity: 61032370176 (56.84 GB)
    Present Capacity: 45842373555 (42.69 GB)
    DFS Remaining: 44294680576 (41.25 GB)
    DFS Used: 1547692979 (1.44 GB)
    DFS Used%: 3.38%
    Under replicated blocks: 125
    Blocks with corrupt replicas: 0
    Missing blocks: 0
    -------------------------------------------------
    Datanodes available: 2 (4 total, 2 dead)
    
    

What just happened?

This is the same process as before; the difference is that due to two DataNode failures there were significantly more blocks that fell below the replication factor, many going down to a single remaining replica. Consequently, you should see more activity in the reported number of under-replicated blocks as it first increase because nodes fail and then drop as re-replication occurs. These events can also be seen in the NameNode logs.

Note that though Hadoop can use re-replication to bring those blocks with only a single remaining replica up to two replicas, this still leaves the blocks in an under-replicated state. With only two live nodes in the cluster, it is now impossible for any block to meet the default replication target of three.

We have been truncating the dfsadmin output for space reasons; in particular, we have been omitting the reported information for each node. However, let's take a look at the first node in our cluster through the previous stages. Before we started killing any DataNode, it reported the following:

Name: 10.0.0.101:50010
Decommission Status : Normal
Configured Capacity: 20344123392 (18.95 GB)
DFS Used: 399379827 (380.88 MB)
Non DFS Used: 5064258189 (4.72 GB)
DFS Remaining: 14880485376(13.86 GB)
DFS Used%: 1.96%
DFS Remaining%: 73.14%
Last contact: Sun Dec 04 15:16:27 PST 2011

After a single DataNode was killed and all blocks had been re-replicated as necessary, it reported the following:

Name: 10.0.0.101:50010
Decommission Status : Normal
Configured Capacity: 20344123392 (18.95 GB)
DFS Used: 515236022 (491.37 MB)
Non DFS Used: 5016289098 (4.67 GB)
DFS Remaining: 14812598272(13.8 GB)
DFS Used%: 2.53%
DFS Remaining%: 72.81%
Last contact: Sun Dec 04 15:31:22 PST 2011

The thing to note is the increase in the local DFS storage on the node. This shouldn't be a surprise. With a dead node, the others in the cluster need to add some additional block replicas and that will translate to a higher storage utilization on each.

Finally, the following is the node's report after two other DataNodes were killed:

Name: 10.0.0.101:50010
Decommission Status : Normal
Configured Capacity: 20344123392 (18.95 GB)
DFS Used: 514289664 (490.46 MB)
Non DFS Used: 5063868416 (4.72 GB)
DFS Remaining: 14765965312(13.75 GB)
DFS Used%: 2.53%
DFS Remaining%: 72.58%
Last contact: Sun Dec 04 15:43:47 PST 2011

With two dead nodes it may seem as if the remaining live nodes should consume even more local storage space, but this isn't the case and it's yet again a natural consequence of the replication factor.

If we have four nodes and a replication factor of 3, each block will have a replica on three of the live nodes in the cluster. If a node dies, the blocks living on the other nodes are unaffected, but any blocks with a replica on the dead node will need a new replica created. However, with only three live nodes, each node will hold a replica of every block. If a second node fails, the situation will result into under-replicated blocks and Hadoop does not have anywhere to put the additional replicas. Since both remaining nodes already hold a replica of each block, their storage utilization does not increase.

Time for action – intentionally causing missing blocks

The next step should be obvious; let's kill three DataNodes in quick succession.

Tip

This is the first of the activities we mentioned that you really should not do on a production cluster. Although there will be no data loss if the steps are followed properly, there is a period when the existing data is unavailable.

The following are the steps to kill three DataNodes in quick succession:

  1. Restart all the nodes by using the following command:
    $ start-all.sh
    
  2. Wait until Hadoop dfsadmin -report shows four live nodes.
  3. Put a new copy of the test file onto HDFS:
    $ Hadoop fs -put file1.data file1.new
    
  4. Log onto three of the cluster hosts and kill the DataNode process on each.
  5. Wait for the usual 10 minutes then start monitoring the cluster via dfsadmin until you get output similar to the following that reports the missing blocks:
    
    Under replicated blocks: 123
    Blocks with corrupt replicas: 0
    Missing blocks: 33
    -------------------------------------------------
    Datanodes available: 1 (4 total, 3 dead)
    
    
  6. Try and retrieve the test file from HDFS:
    $ hadoop fs -get file1.new file1.new
    11/12/04 16:18:05 INFO hdfs.DFSClient: No node available for block: blk_1691554429626293399_1003 file=/user/hadoop/file1.new
    11/12/04 16:18:05 INFO hdfs.DFSClient: Could not obtain block blk_1691554429626293399_1003 from any node: java.io.IOException: No live nodes contain current block
    
    get: Could not obtain block: blk_1691554429626293399_1003 file=/user/hadoop/file1.new
    
  7. Restart the dead nodes using the start-all.sh script:
    $ start-all.sh
    
  8. Repeatedly monitor the status of the blocks:
    $ Hadoop dfsadmin -report | grep -i blocks
    Under replicated blockss: 69
    Blocks with corrupt replicas: 0
    Missing blocks: 35
    $ Hadoop dfsadmin -report | grep -i blocks
    Under replicated blockss: 0
    Blocks with corrupt replicas: 0
    Missing blocks: 30
    
  9. Wait until there are no reported missing blocks then copy the test file onto the local filesystem:
    $ Hadoop fs -get file1.new file1.new
    
  10. Perform an MD5 check on this and the original file:
    $ md5sum file1.*
    f1f30b26b40f8302150bc2a494c1961d file1.data
    f1f30b26b40f8302150bc2a494c1961d file1.new
    

What just happened?

After restarting the killed nodes, we copied the test file onto HDFS again. This isn't strictly necessary as we could have used the existing file but due to the shuffling of the replicas, a clean copy gives the most representative results.

We then killed three DataNodes as before and waited for HDFS to respond. Unlike the previous examples, killing these many nodes meant it was certain that some blocks would have all of their replicas on the killed nodes. As we can see, this is exactly the result; the remaining single node cluster shows over a hundred blocks that are under-replicated (obviously only one replica remains) but there are also 33 missing blocks.

Talking of blocks is a little abstract, so we then try to retrieve our test file which, as we know, effectively has 33 holes in it. The attempt to access the file fails as Hadoop could not find the missing blocks required to deliver the file.

We then restarted all the nodes and tried to retrieve the file again. This time it was successful, but we took an added precaution of performing an MD5 cryptographic check on the file to confirm that it was bitwise identical to the original one — which it is.

This is an important point: though node failure may result in data becoming unavailable, there may not be a permanent data loss if the node recovers.

When data may be lost

Do not assume from this example that it's impossible to lose data in a Hadoop cluster. For general use it is very hard, but disaster often has a habit of striking in just the wrong way.

As seen in the previous example, a parallel failure of a number of nodes equal to or greater than the replication factor has a chance of resulting in missing blocks. In our example of three dead nodes in a cluster of four, the chances were high; in a cluster of 1000, it would be much lower but still non-zero. As the cluster size increases, so does the failure rate and having three node failures in a narrow window of time becomes less and less unlikely. Conversely, the impact also decreases but rapid multiple failures will always carry a risk of data loss.

Another more insidious problem is recurring or partial failures, for example, when power issues across the cluster cause nodes to crash and restart. It is possible for Hadoop to end up chasing replication targets, constantly asking the recovering hosts to replicate under-replicated blocks, and also seeing them fail mid-way through the task. Such a sequence of events can also raise the potential of data loss.

Finally, never forget the human factor. Having a replication factor equal to the size of the cluster—ensuring every block is on every node—won't help you when a user accidentally deletes a file or directory.

The summary is that data loss through system failure is pretty unlikely but is possible through almost inevitable human action. Replication is not a full alternative to backups; ensure that you understand the importance of the data you process and the impact of the types of loss discussed here.

Note

The most catastrophic losses in a Hadoop cluster are actually caused by NameNode failure and filesystem corruption; we'll discuss this topic in some detail in the next chapter.

Block corruption

The reports from each DataNode also included a count of the corrupt blocks, which we have not referred to. When a block is first stored, there is also a hidden file written to the same HDFS directory containing cryptographic checksums for the block. By default, there is a checksum for each 512-byte chunk within the block.

Whenever any client reads a block, it will also retrieve the list of checksums and compare these to the checksums it generates on the block data it has read. If there is a checksum mismatch, the block on that particular DataNode will be marked as corrupt and the client will retrieve a different replica. On learning of the corrupt block, the NameNode will schedule a new replica to be made from one of the existing uncorrupted replicas.

If the scenario seems unlikely, consider that faulty memory, disk drive, storage controller, or numerous issues on an individual host could cause some corruption to a block as it is initially being written while being stored or when being read. These are rare events and the chances of the same corruption occurring on all DataNodes holding replicas of the same block become exceptionally remote. However, remember as previously mentioned that replication is not a full alternative to backup and if you need 100 percent data availability, you likely need to think about off-cluster backup.

Time for action – killing a TaskTracker process

We've abused HDFS and its DataNode enough; now let's see what damage we can do to MapReduce by killing some TaskTracker processes.

Though there is an mradmin command, it does not give the sort of status reports we are used to with HDFS. So we'll use the MapReduce web UI (located by default on port 50070 on the JobTracker host) to monitor the MapReduce cluster health.

Perform the following steps:

  1. Ensure everything is running via the start-all.sh script then point your browser at the MapReduce web UI. The page should look like the following screenshot:
  2. Start a long-running MapReduce job; the example pi estimator with large values is great for this:
    $ Hadoop jar Hadoop/Hadoop-examples-1.0.4.jar pi 2500 2500
    
  3. Now log onto a cluster node and use jps to identify the TaskTracker process:
    $ jps
    21822 TaskTracker
    3918 Jps
    3891 DataNode
    
  4. Kill the TaskTracker process:
    $ kill -9 21822
    
  5. Verify that the TaskTracker is no longer running:
    $jps
    3918 Jps
    3891 DataNode
    
  6. Go back to the MapReduce web UI and after 10 minutes you should see that the number of nodes and available map/reduce slots change as shown in the following screenshot:
  7. Monitor the job progress in the original window; it should be proceeding, even if it is slow.
  8. Restart the dead TaskTracker process:
    $ start-all.sh
    
  9. Monitor the MapReduce web UI. After a little time the number of nodes should be back to its original number as shown in the following screenshot:

What just happened?

The MapReduce web interface provides a lot of information on both the cluster as well as the jobs it executes. For our interests here, the important data is the cluster summary that shows the currently executing number of map and reduce tasks, the total number of submitted jobs, the number of nodes and their map and reduce capacity, and finally, any blacklisted nodes.

The relationship of the JobTracker process to the TaskTracker process is quite different than that between NameNode and DataNode but a similar heartbeat/monitoring mechanism is used.

The TaskTracker process frequently sends heartbeats to the JobTracker, but instead of status reports of block health, they contain progress reports of the assigned task and available capacity. Each node has a configurable number of map and reduce task slots (the default for each is two), which is why we see four nodes and eight map and reduce slots in the first web UI screenshot.

When we kill the TaskTracker process, its lack of heartbeats is measured by the JobTracker process and after a configurable amount of time, the node is assumed to be dead and we see the reduced cluster capacity reflected in the web UI.

Tip

The timeout for a TaskTracker process to be considered dead is modified by the mapred.tasktracker.expiry.interval property, configured in mapred-site.xml.

When a TaskTracker process is marked as dead, the JobTracker process also considers its in-progress tasks as failed and re-assigns them to the other nodes in the cluster. We see this implicitly by watching the job proceed successfully despite a node being killed.

After the TaskTracker process is restarted it sends a heartbeat to the JobTracker, which marks it as alive and reintegrates it into the MapReduce cluster. This we see through the cluster node and task slot capacity returning to their original values as can be seen in the final screenshot.

Comparing the DataNode and TaskTracker failures

We'll not perform similar two or three node killing activities with TaskTrackers as the task execution architecture renders individual TaskTracker failures relatively unimportant. Because the TaskTracker processes are under the control and coordination of JobTracker, their individual failures have no direct effect other than to reduce the cluster execution capacity. If a TaskTracker instance fails, the JobTracker will simply schedule the failed tasks on a healthy TaskTracker process in the cluster. The JobTracker is free to reschedule tasks around the cluster because TaskTracker is conceptually stateless; a single failure does not affect other parts of the job.

In contrast, loss of a DataNode—which is intrinsically stateful—can affect the persistent data held on HDFS, potentially making it unavailable.

This highlights the nature of the various nodes and their relationship to the overall Hadoop framework. The DataNode manages data, and the TaskTracker reads and writes that data. Catastrophic failure of every TaskTracker would still leave us with a completely functional HDFS; a similar failure of the NameNode process would leave a live MapReduce cluster that is effectively useless (unless it was configured to use a different storage system).

Permanent failure

Our recovery scenarios so far have assumed that the dead node can be restarted on the same physical host. But what if it can't due to the host having a critical failure? The answer is simple; you can remove the host from the slave's file and Hadoop will no longer try to start a DataNode or TaskTracker on that host. Conversely, if you get a replacement machine with a different hostname, add this new host to the same file and run start-all.sh.

Note

Note that the slave's file is only used by tools such as the start/stop and slaves.sh scripts. You don't need to keep it updated on every node, but only on the hosts where you generally run such commands. In practice, this is likely to be either a dedicated head node or the host where the NameNode or JobTracker processes run. We'll explore these setups in Chapter 7, Keeping Things Running.

Killing the cluster masters

Though the failure impact of DataNode and TaskTracker processes is different, each individual node is relatively unimportant. Failure of any single TaskTracker or DataNode is not a cause for concern and issues only occur if multiple others fail, particularly in quick succession. But we only have one JobTracker and NameNode; let's explore what happens when they fail.

Time for action – killing the JobTracker

We'll first kill the JobTracker process which we should expect to impact our ability to execute MapReduce jobs but not affect the underlying HDFS filesystem.

  1. Log on to the JobTracker host and kill its process.
  2. Attempt to start a test MapReduce job such as Pi or WordCount:
    $ Hadoop jar wc.jar WordCount3 test.txt output
    Starting Job
    11/12/11 16:03:29 INFO ipc.Client: Retrying connect to server: /10.0.0.100:9001. Already tried 0 time(s).
    11/12/11 16:03:30 INFO ipc.Client: Retrying connect to server: /10.0.0.100:9001. Already tried 1 time(s).
    
    11/12/11 16:03:38 INFO ipc.Client: Retrying connect to server: /10.0.0.100:9001. Already tried 9 time(s).
    java.net.ConnectException: Call to /10.0.0.100:9001 failed on connection exception: java.net.ConnectException: Connection refused
     at org.apache.hadoop.ipc.Client.wrapException(Client.java:767)
     at org.apache.hadoop.ipc.Client.call(Client.java:743)
     at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:220)
    
    
  3. Perform some HDFS operations:
    $ hadoop fs -ls /
    Found 2 items
    drwxr-xr-x - hadoop supergroup 0 2011-12-11 19:19 /user
    drwxr-xr-x - hadoop supergroup 0 2011-12-04 20:38 /var
    $ hadoop fs -cat test.txt
    This is a test file
    

What just happened?

After killing the JobTracker process we attempted to launch a MapReduce job. From the walk-through in where we are starting the job attempts to communicate with the JobTracker process to initiate the job scheduling activities. But in this case there was no running JobTracker, this communication did not happen and the job failed.

We then performed a few HDFS operations to highlight the point in the previous section; a non-functional MapReduce cluster will not directly impact HDFS, which will still be available to all clients and operations.

Starting a replacement JobTracker

The recovery of the MapReduce cluster is also pretty straightforward. Once the JobTracker process is restarted, all the subsequent MapReduce jobs are successfully processed.

Note that when the JobTracker was killed, any jobs that were in flight are lost and need to be restarted. Watch out for temporary files and directories on HDFS; many MapReduce jobs write temporary data to HDFS that is usually cleaned up on job completion. Failed jobs—especially the ones failed due to a JobTracker failure—are likely to leave such data behind and this may require a manual clean-up.

Have a go hero – moving the JobTracker to a new host

But what happens if the host on which the JobTracker process was running has a fatal hardware failure and cannot be recovered? In such situations you will need to start a new JobTracker process on a different host. This requires all nodes to have their mapred-site.xml file updated with the new location and the cluster restarted. Try this! We'll talk about it more in the next chapter.

Time for action – killing the NameNode process

Let's now kill the NameNode process, which we should expect to directly stop us from accessing HDFS and by extension, prevent the MapReduce jobs from executing:

Note

Don't try this on an operationally important cluster. Though the impact will be short-lived, it effectively kills the entire cluster for a period of time.

  1. Log onto the NameNode host and list the running processes:
    $ jps
    2372 SecondaryNameNode
    2118 NameNode
    2434 JobTracker
    5153 Jps
    
  2. Kill the NameNode process. Don't worry about SecondaryNameNode, it can keep running.
  3. Try to access the HDFS filesystem:
    $ hadoop fs -ls /
    11/12/13 16:00:05 INFO ipc.Client: Retrying connect to server: /10.0.0.100:9000. Already tried 0 time(s).
    11/12/13 16:00:06 INFO ipc.Client: Retrying connect to server: /10.0.0.100:9000. Already tried 1 time(s).
    11/12/13 16:00:07 INFO ipc.Client: Retrying connect to server: /10.0.0.100:9000. Already tried 2 time(s).
    11/12/13 16:00:08 INFO ipc.Client: Retrying connect to server: /10.0.0.100:9000. Already tried 3 time(s).
    11/12/13 16:00:09 INFO ipc.Client: Retrying connect to server: /10.0.0.100:9000. Already tried 4 
    time(s).
    
    Bad connection to FS. command aborted.
    
  4. Submit the MapReduce job:
    $ hadoop jar hadoop/hadoop-examples-1.0.4.jar pi 10 100
    Number of Maps = 10
    Samples per Map = 100
    11/12/13 16:00:35 INFO ipc.Client: Retrying connect to server: /10.0.0.100:9000. Already tried 0 time(s).
    11/12/13 16:00:36 INFO ipc.Client: Retrying connect to server: /10.0.0.100:9000. Already tried 1 time(s).
    11/12/13 16:00:37 INFO ipc.Client: Retrying connect to server: /10.0.0.100:9000. Already tried 2 time(s).
    
    java.lang.RuntimeException: java.net.ConnectException: Call to /10.0.0.100:9000 failed on connection exception: java.net.ConnectException: Connection refused
     at org.apache.hadoop.mapred.JobConf.getWorkingDirectory(JobConf.java:371)
     at org.apache.hadoop.mapred.FileInputFormat.setInputPaths(FileInputFormat.java:309)
    
    Caused by: java.net.ConnectException: Call to /10.0.0.100:9000 failed on connection exception: java.net.ConnectException: Connection refused
    
    
  5. Check the running processes:
    $ jps
    2372 SecondaryNameNode
    5253 Jps
    2434 JobTracker
    Restart the NameNode
    $ start-all.sh
    
  6. Access HDFS:
    $ Hadoop fs -ls /
    Found 2 items
    drwxr-xr-x - hadoop supergroup 0 2011-12-16 16:18 /user 
    drwxr-xr-x - hadoop supergroup 0 2011-12-16 16:23 /var 
    

What just happened?

We killed the NameNode process and tried to access the HDFS filesystem. This of course failed; without the NameNode there is no server to receive our filesystem commands.

We then tried to submit a MapReduce job and this also failed. From the abbreviated exception stack trace you can see that while trying to set up the input paths for the job data, the JobTracker also tried and failed to connect to NameNode.

We then confirmed that the JobTracker process is healthy and it was the NameNode's unavailability that caused the MapReduce task to fail.

Finally, we restarted the NameNode and confirmed that we could once again access the HDFS filesystem.

Starting a replacement NameNode

With the differences identified so far between the MapReduce and HDFS clusters, it shouldn't be a surprise to learn that restarting a new NameNode on a different host is not as simple as moving the JobTracker. To put it more starkly, having to move NameNode due to a hardware failure is probably the worst crisis you can have with a Hadoop cluster. Unless you have prepared carefully, the chance of losing all your data is very high.

That's quite a statement and we need to explore the nature of the NameNode process to understand why this is the case.

The role of the NameNode in more detail

So far we've spoken of the NameNode process as the coordinator between the DataNode processes and the service responsible for ensuring the configuration parameters, such as block replication values, are honored. This is an important set of tasks but it's also very operationally focused. The NameNode process also has the responsibility of managing the HDFS filesystem metadata; a good analogy is to think of it holding the equivalent of the file allocation table in a traditional filesystem.

File systems, files, blocks, and nodes

When accessing HDFS you rarely care about blocks. You want to access a given file at a certain location in the filesystem. To facilitate this, the NameNode process is required to maintain numerous pieces of information:

  • The actual filesystem contents, the names of all the files, and their containing directories
  • Additional metadata about each of these elements, such as size, ownership, and replication factor
  • The mapping of which blocks hold the data for each file
  • The mapping of which nodes in the cluster hold which blocks and from this, the current replication state of each

All but the last of the preceding points is persistent data that must be maintained across restarts of the NameNode process.

The single most important piece of data in the cluster – fsimage

The NameNode process stores two data structures to disk, the fsimage file and the edits log of changes to it. The fsimage file holds the key filesystem attributes mentioned in the previous section; the name and details of each file and directory on the filesystem and the mapping of the blocks that correspond to each.

If the fsimage file is lost, you have a series of nodes holding blocks of data without any knowledge of which blocks correspond to which part of which file. In fact, you don't even know which files are supposed to be constructed in the first place. Loss of the fsimage file leaves you with all the filesystem data but renders it effectively useless.

The fsimage file is read by the NameNode process at startup and is held and manipulated in memory for performance reasons. To avoid changes to the filesystem being lost, any modifications made are written to the edits log throughout the NameNode's uptime. The next time it restarts, it looks for this log at startup and uses it to update the fsimage file which it then reads into memory.

Note

This process can be optimized by the use of the SecondaryNameNode which we'll mention later.

DataNode startup

When a DataNode process starts up, it commences its heartbeat process by reporting to the NameNode process on the blocks it holds. As explained earlier in this chapter, this is how the NameNode process knows which node should be used to service a request for a given block. If the NameNode process itself restarts, it uses the re-establishment of the heartbeats with all the DataNode processes to construct its mapping of blocks to nodes.

With the DataNode processes potentially coming in and out of the cluster, there is little use in this mapping being stored persistently as the on-disk state would often be out-of-date with the current reality. This is why the NameNode process does not persist the location of which blocks are held on which nodes.

Safe mode

If you look at the HDFS web UI or the output of dfsadmin shortly after starting an HDFS cluster, you will see a reference to the cluster being in safe mode and the required threshold of the reported blocks before it will leave safe mode. This is the DataNode block reporting mechanism at work.

As an additional safeguard, the NameNode process will hold the HDFS filesystem in a read-only mode until it has confirmed that a given percentage of blocks meet their replication threshold. In the usual case this will simply require all the DataNode processes to report in, but if some have failed, the NameNode process will need to schedule some re-replication before safe mode can be left.

SecondaryNameNode

The most unfortunately named entity in Hadoop is the SecondaryNameNode. When one learns of the critical fsimage file for the first time, this thing called SecondaryNameNode starts to sound like a helpful mitigation. Is it perhaps, as the name suggests, a second copy of the NameNode process running on another host that can take over when the primary fails? No, it isn't. SecondaryNameNode has a very specific role; it periodically reads in the state of the fsimage file and the edits log and writes out an updated fsimage file with the changes in the log applied. This is a major time saver in terms of NameNode startup. If the NameNode process has been running for a significant period of time, the edits log will be huge and it will take a very long time (easily several hours) to apply all the changes to the old fsimage file's state stored on the disk. The SecondaryNameNode facilitates a faster startup.

So what to do when the NameNode process has a critical failure?

Would it help to say don't panic? There are approaches to NameNode failure and this is such an important topic that we have an entire section on it in the next chapter. But for now, the main point is that you can configure the NameNode process to write its fsimage file and edits log to multiple locations. Typically, a network filesystem is added as a second location to ensure a copy of the fsimage file outside the NameNode host.

But the process of moving to a new NameNode process on a new host requires manual effort and your Hadoop cluster is dead in the water until you do. This is something you want to have a process for and that you have tried (successfully!) in a test scenario. You really don't want to be learning how to do this when your operational cluster is down, your CEO is shouting at you, and the company is losing money.

BackupNode/CheckpointNode and NameNode HA

Hadoop 0.22 replaced SecondaryNameNode with two new components, BackupNode and CheckpointNode. The latter of these is effectively a renamed SecondaryNameNode; it is responsible for updating the fsimage file at regular checkpoints to decrease the NameNode startup time.

The BackupNode, however, is a step closer to the goal of a fully functional hot-backup for the NameNode. It receives a constant stream of filesystem updates from the NameNode and its in-memory state is up-to-date at any point in time, with the current state held in the master NameNode. If the NameNode dies, the BackupNode is much more capable of being brought into service as a new NameNode. The process isn't automatic and requires manual intervention and a cluster restart, but it takes some of the pain out of a NameNode failure.

Remember that Hadoop 1.0 is a continuation of the Version 0.20 branch, so it does not contain the features mentioned previously.

Hadoop 2.0 will take these extensions to the next logical step: a fully automatic NameNode failover from the current master NameNode to an up-to-date backup NameNode. This NameNode High Availability (HA) is one of the most long-requested changes to the Hadoop architecture and will be a welcome addition when complete.

Hardware failure

When we killed the various Hadoop components earlier, we were—in most cases—using termination of the Hadoop processes as a proxy for the failure of the hosting physical hardware. From experience, it is quite rare to see the Hadoop processes fail without some underlying host issue causing the problem.

Host failure

Actual failure of the host is the simplest case to consider. A machine could fail due to a critical hardware issue (failed CPU, blown power supply, stuck fans, and so on), causing sudden failure of the Hadoop processes running on the host. Critical bugs in system-level software (kernel panics, I/O locks, and so on) can also have the same effect.

Generally speaking, if the failure causes a host to crash, reboot, or otherwise become unreachable for a period of time, we can expect Hadoop to act just as demonstrated throughout this chapter.

Host corruption

A more insidious problem is when a host appears to be functioning but is in reality producing corrupt results. Examples of this could be faulty memory resulting in corruption of data or disk sector errors, resulting in data on the disk being damaged.

For HDFS, this is where the status reports of corrupted blocks that we discussed earlier come into play.

For MapReduce there is no equivalent mechanism. Just as with most other software, the TaskTracker relies on data being written and read correctly by the host and has no means to detect corruption in either task execution or during the shuffle stage.

The risk of correlated failures

There is a phenomenon that most people don't consider until it bites them; sometimes the cause of a failure will also result in subsequent failures and greatly increase the chance of encountering a data loss scenario.

As an example, I once worked on a system that used four networking devices. One of these failed and no one cared about it; there were three remaining devices, after all. Until they all failed in an 18-hour period. Turned out they all contained hard drives from a faulty batch.

It doesn't have to be quite this exotic; more frequent causes will be due to faults in the shared services or facilities. Network switches can fail, power distribution can spike, air conditioning can fail, and equipment racks can short-circuit. As we'll see in the next chapter Hadoop doesn't assign blocks to random locations, it actively seeks to adopt a placement strategy that provides some protection from such failures in shared services.

We are again talking about unlikely scenarios, most often a failed host is just that and not the tip of a failure-crisis iceberg. However, remember to never discount the unlikely scenarios, especially when taking clusters to progressively larger scale.

Task failure due to software

As mentioned earlier, it is actually relatively rare to see the Hadoop processes themselves crash or otherwise spontaneously fail. What you are likely to see more of in practice are failures caused by the tasks, that is faults in the map or reduce tasks that you are executing on the cluster.

Failure of slow running tasks

We will first look at what happens if tasks hang or otherwise appear to Hadoop to have stopped making progress.

Time for action – causing task failure

Let's cause a task to fail; before we do, we will need to modify the default timeouts:

  1. Add this configuration property to mapred-site.xml:
    <property>
    <name>mapred.task.timeout</name>
    <value>30000</value>
    </property>
  2. We will now modify our old friend WordCount from Chapter 3, Understanding MapReduce. Copy WordCount3.java to a new file called WordCountTimeout.java and add the following imports:
    import java.util.concurrent.TimeUnit ;
    import org.apache.hadoop.fs.FileSystem ;
    import org.apache.hadoop.fs.FSDataOutputStream ;
  3. Replace the map method with the following one:
        public void map(Object key, Text value, Context context
                        ) throws IOException, InterruptedException {
    String lockfile = "/user/hadoop/hdfs.lock" ;
      Configuration config = new Configuration() ;  
    FileSystem hdfs = FileSystem.get(config) ;  
    Path path = new Path(lockfile) ;  
    if (!hdfs.exists(path))
    {
    byte[] bytes = "A lockfile".getBytes() ;
      FSDataOutputStream out = hdfs.create(path) ;  
    out.write(bytes, 0, bytes.length);
    out.close() ;
    TimeUnit.SECONDS.sleep(100) ;
    }
    
    String[] words = value.toString().split(" ") ;
    
    for (String str: words)
    {
            word.set(str);
            context.write(word, one);
    
        }
        }
      }
  4. Compile the file after changing the class name, jar it up, and execute it on the cluster:
    $ Hadoop jar wc.jar WordCountTimeout test.txt output
    
    11/12/11 19:19:51 INFO mapred.JobClient: map 50% reduce 0%
    11/12/11 19:20:25 INFO mapred.JobClient: map 0% reduce 0%
    11/12/11 19:20:27 INFO mapred.JobClient: Task Id : attempt_201112111821_0004_m_000000_0, Status : FAILED
    Task attempt_201112111821_0004_m_000000_0 failed to report status for 32 seconds. Killing!
    11/12/11 19:20:31 INFO mapred.JobClient: map 100% reduce 0%
    11/12/11 19:20:43 INFO mapred.JobClient: map 100% reduce 100%
    11/12/11 19:20:45 INFO mapred.JobClient: Job complete: job_201112111821_0004
    11/12/11 19:20:45 INFO mapred.JobClient: Counters: 18
    11/12/11 19:20:45 INFO mapred.JobClient: Job Counters
    
    

What just happened?

We first modified a default Hadoop property that manages how long a task can seemingly make no progress before the Hadoop framework considers it for termination.

Then we modified WordCount3 to add some logic that causes the task to sleep for 100 seconds. We used a lock file on HDFS to ensure that only a single task instance sleeps. If we just had the sleep statement in the map operation without any checks, every mapper would timeout and the job would fail.

Have a go hero – HDFS programmatic access

We said we would not really deal with programmatic access to HDFS in this book. However, take a look at what we have done here and browse through the Javadoc for these classes. You will find that the interface largely follows the patterns for access to a standard Java filesystem.

Then we compile, jar up the classes, and execute the job on the cluster. The first task goes to sleep and after exceeding the threshold we set (the value was specified in milliseconds), Hadoop kills the task and reschedules another mapper to process the split assigned to the failed task.

Hadoop's handling of slow-running tasks

Hadoop has a balancing act to perform here. It wants to terminate tasks that have got stuck or, for other reasons, are running abnormally slowly; but sometimes complex tasks simply take a long time. This is especially true if the task relies on any external resources to complete its execution.

Hadoop looks for evidence of progress from a task when deciding how long it has been idle/quiet/stuck. Generally this could be:

  • Emitting results
  • Writing values to counters
  • Explicitly reporting progress

For the latter, Hadoop provides the Progressable interface which contains one method of interest:

Public void progress() ;

The Context class implements this interface, so any mapper or reducer can call context.progress() to show it is alive and continuing to process.

Speculative execution

Typically, a MapReduce job will comprise of many discrete maps and reduce task executions. When run across a cluster, there is a real risk that a misconfigured or ill host will cause its tasks to run significantly slower than the others.

To address this, Hadoop will assign duplicate maps or reduce tasks across the cluster towards the end of the map or reduce phase. This speculative task execution is aimed at preventing one or two slow running tasks from causing a significant impact on the overall job execution time.

Hadoop's handling of failing tasks

Tasks won't just hang; sometimes they'll explicitly throw exceptions, abort, or otherwise stop executing in a less silent way than the ones mentioned previously.

Hadoop has three configuration properties that control how it responds to task failures, all set in mapred-site.xml:

  • mapred.map.max.attempts: A given map task will be retried this many times before causing the job to fail
  • mapred.reduce.max.attempts: A given reduce task will be retried these many times before causing the job to fail
  • mapred.max.tracker.failures: The job will fail if this many individual task failures are recorded

The default value for all of these is 4.

Note

Note that it does not make sense for mapred.tracker.max.failures to be set to a value smaller than either of the other two properties.

Which of these you consider setting will depend on the nature of your data and jobs. If your jobs access external resources that may occasionally cause transient errors, increasing the number of repeat failures of a task may be useful. But if the task is very data-specific, these properties may be less applicable as a task that fails once will do so again. However, note that a default value higher than 1 does make sense as in a large complex system various transient failures are always possible.

Have a go hero – causing tasks to fail

Modify the WordCount example; instead of sleeping, have it throw a RuntimeException based on a random number. Modify the cluster configuration and explore the relationship between the configuration properties that manage how many failed tasks will cause the whole job to fail.

Task failure due to data

The final types of failure that we will explore are those related to data. By this, we mean tasks that crash because a given record had corrupt data, used the wrong data types or formats, or a wide variety of related problems. We mean those cases where the data received diverges from expectations.

Handling dirty data through code

One approach to dirty data is to write mappers and reducers that deal with data defensively. So, for example, if the value received by the mapper should be a comma-separated list of values, first validate the number of items before processing the data. If the first value should be a string representation of an integer, ensure that the conversion into a numerical type has solid error handling and default behavior.

The problem with this approach is that there will always be some type of weird data input that was not considered, no matter how careful you were. Did you consider receiving values in a different unicode character set? What about multiple character sets, null values, badly terminated strings, wrongly encoded escape characters, and so on?

If the data input to your jobs is something you generate and/or control, these possibilities are less of a concern. However, if you are processing data received from external sources, there will always be grounds for surprise.

Using Hadoop's skip mode

The alternative is to configure Hadoop to approach task failures differently. Instead of looking upon a failed task as an atomic event, Hadoop can instead attempt to identify which records may have caused the problem and exclude them from future task executions. This mechanism is known as skip mode . This can be useful if you are experiencing a wide variety of data issues where coding around them is not desirable or practical. Alternatively, you may have little choice if, within your job, you are using third-party libraries for which you may not have the source code.

Skip mode is currently available only for jobs written to the pre 0.20 version of API, which is another consideration.

Time for action – handling dirty data by using skip mode

Let's see skip mode in action by writing a MapReduce job that receives the data that causes it to fail:

  1. Save the following Ruby script as gendata.rb:
    File.open("skipdata.txt", "w") do |file|
      3.times do
        500000.times{file.write("A valid record\n")}
        5.times{file.write("skiptext\n")}
      end
      500000.times{file.write("A valid record\n")}
    End
  2. Run the script:
    $ ruby gendata.rb 
    
  3. Check the size of the generated file and its number of lines:
    $ ls -lh skipdata.txt
    -rw-rw-r-- 1 hadoop hadoop 29M 2011-12-17 01:53 skipdata.txt
    ~$ cat skipdata.txt | wc -l
    2000015
    
  4. Copy the file onto HDFS:
    $ hadoop fs -put skipdata.txt skipdata.txt
    
  5. Add the following property definition to mapred-site.xml:
    <property>
    <name>mapred.skip.map.max.skip.records</name>
    <value5</value>
    </property>
  6. Check the value set for mapred.max.map.task.failures and set it to 20 if it is lower.
  7. Save the following Java file as SkipData.java:
    import java.io.IOException;
    
    import org.apache.hadoop.conf.* ;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.* ;
    import org.apache.hadoop.mapred.* ;
    import org.apache.hadoop.mapred.lib.* ;
    
    public class SkipData
    {
        
        public static class MapClass extends MapReduceBase
        implements Mapper<LongWritable, Text, Text, LongWritable>
        {
            
            private final static LongWritable one = new LongWritable(1);
            private Text word = new Text("totalcount");
            
            public void map(LongWritable key, Text value,
                OutputCollector<Text, LongWritable> output,
                    Reporter reporter) throws IOException
                    {
                        String line = value.toString();
                    
                    if (line.equals("skiptext"))
                    throw new RuntimeException("Found skiptext") ;
                    output.collect(word, one);
                }
            }
            
            public static void main(String[] args) throws Exception
            {
                Configuration config = new Configuration() ;
                JobConf conf = new JobConf(config, SkipData.class);
                conf.setJobName("SkipData");
                
                conf.setOutputKeyClass(Text.class);
                conf.setOutputValueClass(LongWritable.class);
                
                conf.setMapperClass(MapClass.class);
                conf.setCombinerClass(LongSumReducer.class);
                conf.setReducerClass(LongSumReducer.class);
                
                FileInputFormat.setInputPaths(conf,args[0]) ;
                FileOutputFormat.setOutputPath(conf, new Path(args[1])) ;
                
                JobClient.runJob(conf);
            }
        }
  8. Compile this file and jar it into skipdata.jar.
  9. Run the job:
    $ hadoop jar skip.jar SkipData skipdata.txt output
    
    11/12/16 17:59:07 INFO mapred.JobClient: map 45% reduce 8%
    11/12/16 17:59:08 INFO mapred.JobClient: Task Id : attempt_201112161623_0014_m_000003_0, Status : FAILED
    java.lang.RuntimeException: Found skiptext
     at SkipData$MapClass.map(SkipData.java:26)
     at SkipData$MapClass.map(SkipData.java:12)
     at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
     at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:358)
     at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307)
     at org.apache.hadoop.mapred.Child.main(Child.java:170)
    11/12/16 17:59:11 INFO mapred.JobClient: map 42% reduce 8%
    ...
    11/12/16 18:01:26 INFO mapred.JobClient: map 70% reduce 16%
    11/12/16 18:01:35 INFO mapred.JobClient: map 71% reduce 16%
    11/12/16 18:01:43 INFO mapred.JobClient: Task Id : attempt_201111161623_0014_m_000003_2, Status : FAILED
    java.lang.RuntimeException: Found skiptext
    ...
    11/12/16 18:12:44 INFO mapred.JobClient: map 99% reduce 29%
    11/12/16 18:12:50 INFO mapred.JobClient: map 100% reduce 29%
    11/12/16 18:13:00 INFO mapred.JobClient: map 100% reduce 100%
    11/12/16 18:13:02 INFO mapred.JobClient: Job complete: job_201112161623_0014
    ...
    
  10. Examine the contents of the job output file:
    $ hadoop fs -cat output/part-00000
    totalcount 2000000
    
  11. Look in the output directory for skipped records:
    $ hadoop fs -ls output/_logs/skip
    Found 15 items
    -rw-r--r-- 3 hadoop supergroup 203 2011-12-16 18:05 /user/hadoop/output/_logs/skip/attempt_201112161623_0014_m_000001_3
    -rw-r--r-- 3 hadoop supergroup 211 2011-12-16 18:06 /user/hadoop/output/_logs/skip/attempt_201112161623_0014_m_000001_4
    
    
  12. Check the job details from the MapReduce UI to observe the recorded statistics as shown in the following screenshot:

What just happened?

We had to do a lot of setup here so let's walk through it a step at a time.

Firstly, we needed to configure Hadoop to use skip mode; it is disabled by default. The key configuration property was set to 5, meaning that we didn't want the framework to skip any set of records greater than this number. Note that this includes the invalid records, and by setting this property to 0 (the default) Hadoop will not enter skip mode.

We also check to ensure that Hadoop is configured with a sufficiently high threshold for repeated task attempt failures, which we will explain shortly.

Next we needed a test file that we could use to simulate dirty data. We wrote a simple Ruby script that generated a file with 2 million lines that we would treat as valid with three sets of five bad records interspersed through the file. We ran this script and confirmed that the generated file did indeed have 2,000,015 lines. This file was then put on HDFS where it would be the job input.

We then wrote a simple MapReduce job that effectively counts the number of valid records. Every time the line reads from the input as the valid text we emit an additional count of 1 to what will be aggregated as a final total. When the invalid lines are encountered, the mapper fails by throwing an exception.

We then compile this file, jar it up, and run the job. The job takes a while to run and as seen from the extracts of the job status, it follows a pattern that we have not seen before. The map progress counter will increase but when a task fails, the progress will drop back then start increasing again. This is skip mode in action.

Every time a key/value pair is passed to the mapper, Hadoop by default increments a counter that allows it to keep track of which record caused a failure.

Tip

If your map or reduce tasks process their input through mechanisms other than directly receiving all data via the arguments to the map or reduce method (for example, from asynchronous processes or caches) you will need to ensure you explicitly update this counter manually.

When a task fails, Hadoop retries it on the same block but attempts to work around the invalid records. Through a binary search approach, the framework performs retries across the data until the number of skipped records is no greater than the maximum value we configured earlier, that is 5. This process does require multiple task retries and failures as the framework seeks the optimal batch to skip, which is why we had to ensure the framework was configured to be tolerant of a higher-than-usual number of repeated task failures.

We watched the job continue following this back and forth process and on completion checked the contents of the output file. This showed 2,000,000 processed records, that is the correct number of valid records in our input file. Hadoop successfully managed to skip only the three sets of five invalid records.

We then looked within the _logs directory in the job output directory and saw that there is a skip directory containing the sequence files of the skipped records.

Finally, we looked at the MapReduce web UI to see the overall job status, which included both the number of records processed while in skip mode as well as the number of records skipped. Note that the total number of failed tasks was 22, which is greater than our threshold for failed map attempts, but this number is aggregate failures across multiple tasks.

To skip or not to skip...

Skip mode can be very effective but as we have seen previously, there is a performance penalty caused by Hadoop having to determine which record range to skip. Our test file was actually quite helpful to Hadoop; the bad records were nicely grouped in three groups and only accounted for a tiny fraction of the overall data set. If there were many more invalid records in the input data and they were spread much more widely across the file, a more effective approach may have been to use a precursor MapReduce job to filter out all the invalid records.

This is why we have presented the topics of writing code to handle bad data and using skip mode consecutively. Both are valid techniques that you should have in your tool belt. There is no single answer to when one or the other is the best approach, you need to consider the input data, performance requirements, and opportunities for hardcoding before making a decision.

Summary

We have caused a lot of destruction in this chapter and I hope you never have to deal with this much failure in a single day with an operational Hadoop cluster. There are some key learning points from the experience.

In general, component failures are not something to fear in Hadoop. Particularly with large clusters, failure of some component or host will be pretty commonplace and Hadoop is engineered to handle this situation. HDFS, with its responsibility to store data, actively manages the replication of each block and schedules new copies to be made when the DataNode processes die.

MapReduce has a stateless approach to TaskTracker failure and in general simply schedules duplicate jobs if one fails. It may also do this to prevent the misbehaving hosts from slowing down the whole job.

Failure of the HDFS and MapReduce master nodes is a more significant failure. In particular, the NameNode process holds critical filesystem data and you must actively ensure you have it set up to allow a new NameNode process to take over.

In general, hardware failures will look much like the previous process failures, but always be aware of the possibility of correlated failures. If tasks fail due to software errors, Hadoop will retry them within configurable thresholds. Data-related errors can be worked around by employing skip mode, though it will come with a performance penalty.

Now that we know how to handle failures in our cluster, we will spend the next chapter working through the broader issues of cluster setup, health, and maintenance.