Hadoop:Data Processing and Modelling
上QQ阅读APP看书,第一时间看更新

Chapter 5. Advanced MapReduce Techniques

Now that we have looked at a few details of the fundamentals of MapReduce and its usage, it's time to examine some more techniques and concepts involved in MapReduce. This chapter will cover the following topics:

  • Performing joins on data
  • Implementing graph algorithms in MapReduce
  • How to represent complex datatypes in a language-independent fashion

Along the way, we'll use the case studies as examples in order to highlight other aspects such as tips and tricks and identifying some areas of best practice.

Simple, advanced, and in-between

Including the word "advanced" in a chapter title is a little dangerous, as complexity is a subjective concept. So let's be very clear about the material covered here. We don't, for even a moment, suggest that this is the pinnacle of distilled wisdom that would otherwise take years to acquire. Conversely, we also don't claim that some of the techniques and problems covered in this chapter will have occurred to someone new to the world of Hadoop.

For the purposes of this chapter, therefore, we use the term "advanced" to cover things that you don't see in the first days or weeks, or wouldn't necessarily appreciate if you did. These are some techniques that provide both specific solutions to particular problems but also highlight ways in which the standard Hadoop and related APIs can be employed to address problems that are not obviously suited to the MapReduce processing model. Along the way, we'll also point out some alternative approaches that we don't implement here but which may be useful sources for further research.

Our first case study is a very common example of this latter case; performing join-type operations within MapReduce.

Joins

Few problems use a single set of data. In many cases, there are easy ways to obviate the need to try and process numerous discrete yet related data sets within the MapReduce framework.

The analogy here is, of course, to the concept of join in a relational database. It is very natural to segment data into numerous tables and then use SQL statements that join tables together to retrieve data from multiple sources. The canonical example is where a main table has only ID numbers for particular facts, and joins against other tables are used to extract data about the information referred to by the unique ID.

When this is a bad idea

It is possible to implement joins in MapReduce. Indeed, as we'll see, the problem is less about the ability to do it and more the choice of which of many potential strategies to employ.

However, MapReduce joins are often difficult to write and easy to make inefficient. Work with Hadoop for any length of time, and you will come across a situation where you need to do it. However, if you very frequently need to perform MapReduce joins, you may want to ask yourself if your data is well structured and more relational in nature than you first assumed. If so, you may want to consider Apache Hive (the main topic of Chapter 8, A Relational View on Data with Hive) or Apache Pig (briefly mentioned in the same chapter). Both provide additional layers atop Hadoop that allow data processing operations to be expressed in high-level languages; in the case of Hive, through a variant of SQL.

Map-side versus reduce-side joins

That caveat out of the way, there are two basic approaches to join data in Hadoop and these are given their names depending on where in the job execution the join occurs. In either case, we need to bring multiple data streams together and perform the join through some logic. The basic difference between these two approaches is whether the multiple data streams are combined within the mapper or reducer functions.

Map-side joins, as the name implies, read the data streams into the mapper and uses logic within the mapper function to perform the join. The great advantage of a map-side join is that by performing all joining—and more critically data volume reduction—within the mapper, the amount of data transferred to the reduce stage is greatly minimized. The drawback of map-side joins is that you either need to find a way of ensuring one of the data sources is very small or you need to define the job input to follow very specific criteria. Often, the only way to do that is to preprocess the data with another MapReduce job whose sole purpose is to make the data ready for a map-side join.

In contrast, a reduce-side join has the multiple data streams processed through the map stage without performing any join logic and does the joining in the reduce stage. The potential drawback of this approach is that all the data from each source is pulled through the shuffle stage and passed into the reducers, where much of it may then be discarded by the join operation. For large data sets, this can become a very significant overhead.

The main advantage of the reduce-side join is its simplicity; you are largely responsible for how the jobs are structured and it is often quite straightforward to define a reduce-side join approach for related data sets. Let's look at an example.

Matching account and sales information

A common situation in many companies is that sales records are kept separate from the client data. There is, of course, a relationship between the two; usually a sales record contains the unique ID of the user account through which the sale was performed.

In the Hadoop world, these would be represented by two types of data files: one containing records of the user IDs and information for sales, and the other would contain the full data for each user account.

Frequent tasks require reporting that uses data from both these sources; say, for example, we wanted to see the total number of sales and total value for each user but do not want to associate it with an anonymous ID number, but rather with a name. This may be valuable when customer service representatives wish to call the most frequent customers—data from the sales records—but want to be able to refer to the person by name and not just a number.

Time for action – reduce-side join using MultipleInputs

We can perform the report explained in the previous section using a reduce-side join by performing the following steps:

  1. Create the following tab-separated file and name it sales.txt:
    00135.992012-03-15
    00212.492004-07-02
    00413.422005-12-20
    003499.992010-12-20
    00178.952012-04-02
    00221.992006-11-30
    00293.452008-09-10
    0019.992012-05-17
  2. Create the following tab-separated file and name it accounts.txt:
    001John AllenStandard2012-03-15
    002Abigail SmithPremium2004-07-13
    003April StevensStandard2010-12-20
    004Nasser HafezPremium2001-04-23
  3. Copy the datafiles onto HDFS.
    $ hadoop fs -mkdir sales
    $ hadoop fs -put sales.txt sales/sales.txt
    $ hadoop fs -mkdir accounts
    $ hadoop fs -put accounts/accounts.txt
    
  4. Create the following file and name it ReduceJoin.java:
    import java.io.* ;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.*;
    import org.apache.hadoop.mapreduce.lib.input.*;
    import org.apache.hadoop.mapreduce.lib.input.*;
    
    public class ReduceJoin
    {
    
        public static class SalesRecordMapper
    extends Mapper<Object, Text, Text, Text>
    {
    
            public void map(Object key, Text value, Context context)
    throws IOException, InterruptedException
            {
                String record = value.toString() ;
                String[] parts = record.split("\t") ;
    
                context.write(new Text(parts[0]), new 
    Text("sales\t"+parts[1])) ;
            }
        }
    
        public static class AccountRecordMapper
    extends Mapper<Object, Text, Text, Text>
    {
            public void map(Object key, Text value, Context context)
    throws IOException, InterruptedException
            {
                String record = value.toString() ;
                String[] parts = record.split("\t") ;
    
                context.write(new Text(parts[0]), new 
    Text("accounts\t"+parts[1])) ;
           }
        }
    
        public static class ReduceJoinReducer
        extends Reducer<Text, Text, Text, Text>
        {
    
            public void reduce(Text key, Iterable<Text> values,
                Context context)
                throws IOException, InterruptedException
            {
                String name = "" ;
    double total = 0.0 ;
                int count = 0 ;
    
                for(Text t: values)
                {
                    String parts[] = t.toString().split("\t") ;
    
                    if (parts[0].equals("sales"))
                    {
                        count++ ;
                        total+= Float.parseFloat(parts[1]) ;
                    }
                    else if (parts[0].equals("accounts"))
                    {
                        name = parts[1] ;
                    }
                }
    
                String str = String.format("%d\t%f", count, total) ;
                context.write(new Text(name), new Text(str)) ;
            }
        }
    
        public static void main(String[] args) throws Exception 
    {
            Configuration conf = new Configuration();
            Job job = new Job(conf, "Reduce-side join");
            job.setJarByClass(ReduceJoin.class);
            job.setReducerClass(ReduceJoinReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
    MultipleInputs.addInputPath(job, new Path(args[0]), 
    TextInputFormat.class, SalesRecordMapper.class) ;
    MultipleInputs.addInputPath(job, new Path(args[1]), 
    TextInputFormat.class, AccountRecordMapper.class) ;
            Path outputPath = new Path(args[2]);
            FileOutputFormat.setOutputPath(job, outputPath);
    outputPath.getFileSystem(conf).delete(outputPath);
    
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }
  5. Compile the file and add it to a JAR file.
    $ javac ReduceJoin.java
    $ jar -cvf join.jar *.class
    
  6. Run the job by executing the following command:
    $ hadoop jar join.jarReduceJoin sales accounts outputs
    
  7. Examine the result file.
    $ hadoop fs -cat /user/garry/outputs/part-r-00000
    John Allen3124.929998
    Abigail Smith3127.929996
    April Stevens1499.989990
    Nasser Hafez113.420000
    

What just happened?

Firstly, we created the datafiles to be used in this example. We created two small data sets as this makes it easier to track the result output. The first data set we defined was the account details with four columns, as follows:

  • The account ID
  • The client name
  • The type of account
  • The date the account was opened

We then created a sales record with three columns:

  • The account ID of the purchaser
  • The value of the sale
  • The date of the sale

Naturally, real account and sales records would have many more fields than the ones mentioned here. After creating the files, we placed them onto HDFS.

We then created the ReduceJoin.java file, which looks very much like the previous MapReduce jobs we have used. There are a few aspects to this job that make it special and allow us to implement a join.

Firstly, the class has two defined mappers. As we have seen before, jobs can have multiple mappers executed in a chain; but in this case, we wish to apply different mappers to each of the input locations. Accordingly, we have the sales and account data defined into the SalesRecordMapper and AccountRecordMapper classes. We used the MultipleInputs class from the org.apache.hadoop.mapreduce.lib.io package as follows:

MultipleInputs.addInputPath(job, new Path(args[0]), 
TextInputFormat.class, SalesRecordMapper.class) ;
MultipleInputs.addInputPath(job, new Path(args[1]), 
TextInputFormat.class, AccountRecordMapper.class) ;

As you can see, unlike in previous examples where we add a single input location, the MultipleInputs class allows us to add multiple sources and associate each with a distinct input format and mapper.

The mappers are pretty straightforward; the SalesRecordMapper class emits an output of the form <account number>, <sales value> while the AccountRecordMapper class emits an output of the form <account number>, <client name>. We therefore have the order value and client name for each sale being passed into the reducer where the actual join will happen.

Notice that both mappers actually emit more than the required values. The SalesRecordMapper class prefixes its value output with sales while the AccountRecordMapper class uses the tag account.

If we look at the reducer, we can see why this is so. The reducer retrieves each record for a given key, but without these explicit tags we would not know if a given value came from the sales or account mapper and hence would not understand how to treat the data value.

The ReduceJoinReducer class therefore treats the values in the Iterator object differently, depending on which mapper they came from. Values from the AccountRecordMapper class—and there should be only one—are used to populate the client name in the final output. For each sales record—likely to be multiple, as most clients buy more than a single item—the total number of orders is counted as is the overall combined value. The output from the reducer is therefore a key of the account holder name and a value string containing the number of orders and the total order value.

We compile and execute the class; notice how we provide three arguments representing the two input directories as well as the single output source. Because of how the MultipleInputs class is configured, we must also ensure we specify the directories in the right order; there is no dynamic mechanism to determine which type of file is in which location.

After execution, we examine the output file and confirm that it does indeed contain the overall totals for named clients as expected.

DataJoinMapper and TaggedMapperOutput

There is a way of implementing a reduce-side join in a more sophisticated and object-oriented fashion. Within the org.apache.hadoop.contrib.join package are classes such as DataJoinMapperBase and TaggedMapOutput that provide an encapsulated means of deriving the tags for map output and having them processed at the reducer. This mechanism means you don't have to define explicit tag strings as we did previously and then carefully parse out the data received at the reducer to determine from which mapper the data came; there are methods in the provided classes that encapsulate this functionality.

This capability is particularly valuable when using numeric or other non-textual data. For creating our own explicit tags as in the previous example, we would have to convert types such as integers into strings to allow us to add the required prefix tag. This will be more inefficient than using the numeric types in their normal form and relying on the additional classes to implement the tag.

The framework allows for quite sophisticated tag generation as well as concepts such as tag grouping that we didn't implement previously. There is additional work required to use this mechanism that includes overriding additional methods and using a different map base class. For straightforward joins such as in the previous example, this framework may be overkill, but if you find yourself implementing very complex tagging logic, it may be worth a look.

Implementing map-side joins

For a join to occur at a given point, we must have access to the appropriate records from each data set at that point. This is where the simplicity of the reduce-side join comes into its own; though it incurs the expense of additional network traffic, processing it by definition ensures that the reducer has all records associated with the join key.

If we wish to perform our join in the mapper, it isn't as easy to make this condition hold true. We can't assume that our input data is sufficiently well structured to allow associated records to be read simultaneously. We generally have two classes of approach here: obviate the need to read from multiple external sources or preprocess the data so that it is amenable for map-side joining.

Using the Distributed Cache

The simplest way of realizing the first approach is to take all but one data set and make it available in the Distributed Cache that we used in the previous chapter. The approach can be used for multiple data sources, but for simplicity let's discuss just two.

If we have one large data set and one smaller one, such as with the sales and account info earlier, one option would be to package up the account info and push it into the Distributed Cache. Each mapper would then read this data into an efficient data structure, such as a hash table that uses the join key as the hash key. The sales records are then processed, and during the processing of record each the needed account information can be retrieved from the hash table.

This mechanism is very effective and when one of the smaller data sets can easily fit into memory, it is a great approach. However, we are not always that lucky, and sometimes the smallest data set is still too large to be copied to every worker machine and held in memory.

Have a go hero - Implementing map-side joins

Take the previous sales/account record example and implement a map-side join using the Distributed Cache. If you load the account records into a hash table that maps account ID numbers to client names, you can use the account ID to retrieve the client name. Do this within the mapper while processing the sales records.

Pruning data to fit in the cache

If the smallest data set is still too big to be used in the Distributed Cache, all is not necessarily lost. Our earlier example, for instance, extracted only two fields from each record and discarded the other fields not required by the job. In reality, an account will be described by many attributes, and this sort of reduction will limit the data size dramatically. Often the data available to Hadoop is this full data set, but what we need is only a subset of the fields.

In such a case, therefore, it may be possible to extract from the full data set only the fields that are needed during the MapReduce job, and in doing so create a pruned data set that is small enough to be used in the cache.

Note

This is a very similar concept to the underlying column-oriented databases . Traditional relational databases store data a row at a time, meaning that the full row needs to be read to extract a single column. A column-based database instead stores each column separately, allowing a query to read only the columns in which it is interested.

If you take this approach, you need to consider what mechanism will be used to generate the data subset and how often this will be done. The obvious approach is to write another MapReduce job that does the necessary filtering and this output is then used in the Distributed Cache for the follow-on job. If the smaller data set changes only rarely, you may be able to get away with generating the pruned data set on a scheduled basis; for example, refresh it every night. Otherwise, you will need to make a chain of two MapReduce jobs: one to produce the pruned data set and the other to perform the join operation using the large set and the data in the Distributed Cache.

Using a data representation instead of raw data

Sometimes, one of the data sources is not used to retrieve additional data but is instead used to derive some fact that is then used in a decision process. We may, for example, be looking to filter sales records to extract only those for which the shipping address was in a specific locale.

In such a case, we can reduce the required data size down to a list of the applicable sales records that may more easily fit into the cache. We can again store it as a hash table, where we are just recording the fact that the record is valid, or even use something like a sorted list or a tree. In cases where we can accept some false positives while still guaranteeing no false negatives, a Bloom filter provides an extremely compact way of representing such information.

As can be seen, applying this approach to enable a map-side join requires creativity and not a little luck in regards to the nature of the data set and the problem at hand. But remember that the best relational database administrators spend significant time optimizing queries to remove unnecessary data processing; so it's never a bad idea to ask if you truly need to process all that data.

Using multiple mappers

Fundamentally, the previous techniques are trying to remove the need for a full cross data set join. But sometimes this is what you have to do; you may simply have very large data sets that cannot be combined in any of these clever ways.

There are classes within the org.apache.hadoop.mapreduce.lib.join package that support this situation. The main class of interest is CompositeInputFormat, which applies a user-defined function to combine records from multiple data sources.

The main limitation of this approach is that the data sources must already be indexed based on the common key, in addition to being both sorted and partitioned in the same way. The reason for this is simple: when reading from each source, the framework needs to know if a given key is present at each location. If we know that each partition is sorted and contains the same key range, simple iteration logic can do the required matching.

This situation is obviously not going to happen by accident, so again you may find yourself writing preprocess jobs to transform all the input data sources into the correct sort and partition structure.

Note

This discussion starts to touch on distributed and parallel join algorithms; both topics are of extensive academic and commercial research. If you are interested in the ideas and want to learn more of the underlying theory, go searching on http://scholar.google.com.

To join or not to join...

After our tour of joins in the MapReduce world, let's come back to the original question: are you really sure you want to be doing this? The choice is often between a relatively easily implemented yet inefficient reduce-side join, and more efficient but more complex map-side alternatives. We have seen that joins can indeed be implemented in MapReduce, but they aren't always pretty. This is why we advise the use of something like Hive or Pig if these types of problems comprise a large portion of your workload. Obviously, we can use tools such as those that do their own translation into MapReduce code under the hood and directly implement both map-side and reduce-side joins, but it's often better to use a well-engineered and well-optimized library for such workloads instead of building your own. That is after all why you are using Hadoop and not writing your own distributed processing framework!

Graph algorithms

Any good computer scientist will tell you that the graph data structure is one of the most powerful tools around. Many complex systems are best represented by graphs and a body of knowledge going back at least decades (centuries if you get more mathematical about it) provides very powerful algorithms to solve a vast variety of graph problems. But by their very nature, graphs and their algorithms are often very difficult to imagine in a MapReduce paradigm.

Graph 101

Let's take a step back and define some terminology. A graph is a structure comprising of nodes (also called vertices) that are connected by links called edges . Depending on the type of graph, the edges may be bidirectional or unidirectional and may have weights associated with them. For example, a city road network can be seen as a graph where the roads are the edges, and intersections and points of interest are nodes. Some streets are one-way and some are not, some have tolls, some are closed at certain times of day, and so forth.

For transportation companies, there is much money to be made by optimizing the routes taken from one point to another. Different graph algorithms can derive such routes by taking into account attributes such as one-way streets and other costs expressed as weights that make a given road more attractive or less so.

For a more current example, think of the social graph popularized by sites such as Facebook where the nodes are people and the edges are the relationships between them.

Graphs and MapReduce – a match made somewhere

The main reason graphs don't look like many other MapReduce problems is due to the stateful nature of graph processing, which can be seen in the path-based relationship between elements and often between the large number of nodes processed together for a single algorithm. Graph algorithms tend to use notions of the global state to make determinations about which elements to process next and modify such global knowledge at each step.

In particular, most of the well-known algorithms often execute in an incremental or reentrant fashion, building up structures representing processed and pending nodes, and working through the latter while reducing the former.

MapReduce problems, on the other hand, are conceptually stateless and typically based upon a divide-and-conquer approach where each Hadoop worker host processes a small subset of the data, writing out a portion of the final result where the total job output is viewed as the simple collection of these smaller outputs. Therefore, when implementing graph algorithms in Hadoop, we need to express algorithms that are fundamentally stateful and conceptually single-threaded in a stateless parallel and distributed framework. That's the challenge!

Most of the well-known graph algorithms are based upon search or traversal of the graph, often to find routes—frequently ranked by some notion of cost—between nodes. The most fundamental graph traversal algorithms are depth-first search (DFS) and breadth-first search (BFS).The difference between the algorithms is the ordering in which a node is processed in relationship to its neighbors.

We will look at representing an algorithm that implements a specialized form of such a traversal; for a given starting node in the graph, determine the distance between it and every other node in the graph.

Note

As can be seen, the field of graph algorithms and theory is a huge one that we barely scratch the surface of here. If you want to find out more, the Wikipedia entry on graphs is a good starting point; it can be found at http://en.wikipedia.org/wiki/Graph_(abstract_data_type).

Representing a graph

The first problem we face is how to represent the graph in a way we can efficiently process using MapReduce. There are several well-known graph representations known as pointer-based, adjacency matrix, and adjacency list. In most implementations, these representations often assume a single process space with a global view of the whole graph; we need to modify the representation to allow individual nodes to be processed in discrete map and reduce tasks.

We'll use the graph shown here in the following examples. The graph does have some extra information that will be explained later.

Our graph is quite simple; it has only seven nodes, and all but one of the edges is bidirectional. We are also using a common coloring technique that is used in standard graph algorithms, as follows:

  • White nodes are yet to be processed
  • Gray nodes are currently being processed
  • Black nodes have been processed

As we process our graph in the following steps, we will expect to see the nodes move through these stages.

Time for action – representing the graph

Let's define a textual representation of the graph that we'll use in the following examples.

Create the following as graph.txt:

12,3,40C
21,4
31,5,6
41,2
53,6
63,5
76

What just happened?

We defined a file structure that will represent our graph, based somewhat on the adjacency list approach. We assumed that each node has a unique ID and the file structure has four fields, as follows:

  • The node ID
  • A comma-separated list of neighbors
  • The distance from the start node
  • The node status

In the initial representation, only the starting node has values for the third and fourth columns: its distance from itself is 0 and its status is "C", which we'll explain later.

Our graph is directional—more formally referred to as a directed graph—that is to say, if node 1 lists node 2 as a neighbor, there is only a return path if node 2 also lists node 1 as its neighbor. We see this in the graphical representation where all but one edge has an arrow on both ends.

Overview of the algorithm

Because this algorithm and corresponding MapReduce job is quite involved, we'll explain it before showing the code, and then demonstrate it in use later.

Given the previous representation, we will define a MapReduce job that will be executed multiple times to get the final output; the input to a given execution of the job will be the output from the previous execution.

Based on the color code described in the previous section, we will define three states for a node:

  • Pending: The node is yet to be processed; it is in the default state (white)
  • Currently processing: The node is being processed (gray)
  • Done: The final distance for the node has been determined (black)

The mapper

The mapper will read in the current representation of the graph and treat each node as follows:

  • If the node is marked as Done, it gives output with no changes.
  • If the node is marked as Currently processing, its state is changed to Done and gives output with no other changes. Each of its neighbors gives output as per the current record with its distance incremented by one, but with no neighbors; node 1 doesn't know node 2's neighbors, for example.
  • If the node is marked Pending, its state is changed to Currently processing and it gives output with no further changes.

The reducer

The reducer will receive one or more records for each node ID, and it will combine their values into the final output node record for that stage.

The general algorithm for the reducer is as follows:

  • A Done record is the final output and no further processing of the values is performed
  • For other nodes, the final output is built up by taking the list of neighbors, where it is to be found, and the highest distance and state

Iterative application

If we apply this algorithm once, we will get node 1 marked as Done, several more (its immediate neighbors) as Current, and a few others as Pending. Successive applications of the algorithm will see all nodes move to their final state; as each node is encountered, its neighbors are brought into the processing pipeline. We will show this later.

Time for action – creating the source code

We'll now see the source code to implement our graph traversal. Because the code is lengthy, we'll break it into multiple steps; obviously they should all be together in a single source file.

  1. Create the following as GraphPath.java with these imports:
    import java.io.* ;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.*;
    import org.apache.hadoop.mapreduce.lib.input.*;
    import org.apache.hadoop.mapreduce.lib.output.*;
    
    public class GraphPath
    {
  2. Create an inner class to hold an object-oriented representation of a node:
    // Inner class to represent a node
        public static class Node
        {
    // The integer node id
            private String id ;
    // The ids of all nodes this node has a path to
            private String neighbours ;
    // The distance of this node to the starting node
            private int distance ;
    // The current node state
            private String state ;
    
    // Parse the text file representation into a Node object
            Node( Text t)
            {
                String[] parts = t.toString().split("\t") ;
    this.id = parts[0] ;
    this.neighbours = parts[1] ;
                if (parts.length<3 || parts[2].equals(""))
    this.distance = -1 ;
                else
    this.distance = Integer.parseInt(parts[2]) ;
    
                if (parts.length< 4 || parts[3].equals(""))
    this.stae = "P" ;
                else
    this.state = parts[3] ;
            }
    
    // Create a node from a key and value object pair
            Node(Text key, Text value)
            {
                this(new Text(key.toString()+"\t"+value.toString())) ;
            }
    
            Public String getId()
            {return this.id ;
            }
    
            public String getNeighbours()
            {
                return this.neighbours ;
            }
    
            public int getDistance()
            {
                return this.distance ;
            }
    
            public String getState()
            {
                return this.state ;
            }
        }
  3. Create the mapper for the job. The mapper will create a new Node object for its input and then examine it, and based on its state do the appropriate processing.
        public static class GraphPathMapper
    extends Mapper<Object, Text, Text, Text>
    {
    
           public void map(Object key, Text value, Context context)
    throws IOException, InterruptedException 
    {
             Node n = new Node(value) ;
    
             if (n.getState().equals("C"))
             {
    //  Output the node with its state changed to Done
                context.write(new Text(n.getId()), new Text(n.getNeighbours()+"\t"+n.getDistance()+"\t"+"D")) ;
    
                    for (String neighbour:n.getNeighbours().split(","))
                    {
    // Output each neighbour as a Currently processing node
    // Increment the distance by 1; it is one link further away
                        context.write(new Text(neighbour), new 
    Text("\t"+(n.getDistance()+1)+"\tC")) ;
                    }
                }
                else
                {
    // Output a pending node unchanged
                    context.write(new Text(n.getId()), new 
    Text(n.getNeighbours()+"\t"+n.getDistance()
    +"\t"+n.getState())) ;
                }
    
            }
        }
  4. Create the reducer for the job. As with the mapper, this reads in a representation of a node and gives as output a different value depending on the state of the node. The basic approach is to collect from the input the largest value for the state and distance columns, and through this converge to the final solution.
        public static class GraphPathReducer
    extends Reducer<Text, Text, Text, Text>
    {
    
            public void reduce(Text key, Iterable<Text> values,
                Context context)
                throws IOException, InterruptedException 
    {
    // Set some default values for the final output
                String neighbours = null ;
    int distance = -1 ;
    String state = "P" ;
    
                for(Text t: values)
    {
                    Node n = new Node(key, t) ;
    
                    if (n.getState().equals("D"))
                    {
    // A done node should be the final output; ignore the remaining 
    // values
    neighbours = n.getNeighbours() ;
                        distance = n.getDistance() ;
                        state = n.getState() ;
                        break ;
                    }
    
    // Select the list of neighbours when found                
                    if (n.getNeighbours() != null)
    neighbours = n.getNeighbours() ;
    
    // Select the largest distance
                    if (n.getDistance() > distance)
    distance = n.getDistance() ;
    
    // Select the highest remaining state
                    if (n.getState().equals("D") || 
    (n.getState().equals("C") &&state.equals("P")))
    state=n.getState() ;
                }
    
    // Output a new node representation from the collected parts        
                context.write(key, new 
    Text(neighbours+"\t"+distance+"\t"+state)) ;
            }
        }
  5. Create the job driver:
        public static void main(String[] args) throws Exception 
    {
            Configuration conf = new Configuration();
            Job job = new Job(conf, "graph path");
            job.setJarByClass(GraphPath.class);
            job.setMapperClass(GraphPathMapper.class);
            job.setReducerClass(GraphPathReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }

What just happened?

The job here implements the previously described algorithm that we'll execute in the following sections. The job setup is pretty standard, and apart from the algorithm definition the only new thing here is the use of an inner class to represent nodes.

The input to a mapper or reducer is often a flattened representation of a more complex structure or object. We could just use that representation, but in this case this would result in the mapper and reducer bodies being full of text and string manipulation code that would obscure the actual algorithm.

The use of the Node inner class allows the mapping from the flat file to object representation that is to be encapsulated in an object that makes sense in terms of the business domain. This also makes the mapper and reducer logic clearer as comparisons between object attributes are more semantically meaningful than comparisons with slices of a string identified only by absolute index positions.

Time for action – the first run

Let's now perform the initial execution of this algorithm on our starting representation of the graph:

  1. Put the previously created graph.txt file onto HDFS:
    $ hadoop fs -mkdirgraphin
    $ hadoop fs -put graph.txtgraphin/graph.txt
    
  2. Compile the job and create the JAR file:
    $ javac GraphPath.java
    $ jar -cvf graph.jar *.class
    
  3. Execute the MapReduce job:
    $ hadoop jar graph.jarGraphPathgraphingraphout1
    
  4. Examine the output file:
    $ hadoop fs –cat /home/user/hadoop/graphout1/part-r00000
    12,3,40D
    21,41C
    31,5,61C
    41,21C
    53,6-1P
    63,5-1P
    76-1P
    

What just happened?

After putting the source file onto HDFS and creating the job JAR file, we executed the job in Hadoop. The output representation of the graph shows a few changes, as follows:

  • Node 1 is now marked as Done; its distance from itself is obviously 0
  • Nodes 2, 3, and 4 – the neighbors of node 1 — are marked as Currently processing
  • All other nodes are Pending

Our graph now looks like the following figure:

Given the algorithm, this is to be expected; the first node is complete and its neighboring nodes, extracted through the mapper, are in progress. All other nodes are yet to begin processing.

Time for action – the second run

If we take this representation as the input to another run of the job, we would expect nodes 2, 3, and 4 to now be complete, and for their neighbors to now be in the Current state. Let's see; execute the following steps:

  1. Execute the MapReduce job by executing the following command:
    $ hadoop jar graph.jarGraphPathgraphout1graphout2
    
  2. Examine the output file:
    $ hadoop fs -cat /home/user/hadoop/graphout2/part-r000000
    12,3,40D
    21,41D
    31,5,61D
    41,21D
    53,62C
    63,52C
    76-1P
    

What just happened?

As expected, nodes 1 through 4 are complete, nodes 5 and 6 are in progress, and node 7 is still pending, as seen in the following figure:

If we run the job again, we should expect nodes 5 and 6 to be Done and any unprocessed neighbors to become Current.

Time for action – the third run

Let's validate that assumption by running the algorithm for the third time.

  1. Execute the MapReduce job:
    $ hadoop jar graph.jarGraphPathgraphout2graphout3
    
  2. Examine the output file:
    $ hadoop fs -cat /user/hadoop/graphout3/part-r-00000
    12,3,40D
    21,41D
    31,5,61D
    41,21D
    53,62D
    63,52D
    76-1P
    

What just happened?

We now see nodes 1 through 6 are complete. But node 7 is still pending and no nodes are currently being processed, as shown in the following figure:

The reason for this state is that though node 7 has a link to node 6, there is no edge in the reverse direction. Node 7 is therefore effectively unreachable from node 1. If we run the algorithm one final time, we should expect to see the graph unchanged.

Time for action – the fourth and last run

Let's perform the fourth execution to validate that the output has now reached its final stable state.

  1. Execute the MapReduce job:
    $ hadoop jar graph.jarGraphPathgraphout3graphout4
    
  2. Examine the output file:
    $ hadoop fs -cat /user/hadoop/graphout4/part-r-00000
    12,3,40D
    21,41D
    31,5,61D
    41,21D
    53,62D
    63,52D
    76-1P
    

What just happened?

The output is as expected; since node 7 is not reachable by node 1 or any of its neighbors, it will remain Pending and never be processed further. Consequently, our graph is unchanged as shown in the following figure:

The one thing we did not build into our algorithm was an understanding of a terminating condition; the process is complete if a run does not create any new D or C nodes.

The mechanism we use here is manual, that is, we knew by examination that the graph representation had reached its final stable state. There are ways of doing this programmatically, however. In a later chapter, we will discuss custom job counters; we can, for example, increment a counter every time a new D or C node is created and only reexecute the job if that counter is greater than zero after the run.

Running multiple jobs

The previous algorithm is the first time we have explicitly used the output of one MapReduce job as the input to another. In most cases, the jobs are different; but, as we have seen, there is value in repeatedly applying an algorithm until the output reaches a stable state.

Final thoughts on graphs

For anyone familiar with graph algorithms, the previous process will seem very alien. This is simply a consequence of the fact that we are implementing a stateful and potentially recursive global and reentrant algorithm as a series of serial stateless MapReduce jobs. The important fact is not in the particular algorithm used; the lesson is in how we can take flat text structures and a series of MapReduce jobs, and from this implement something like graph traversal. You may have problems that at first don't appear to have any way of being implemented in the MapReduce paradigm; consider some of the techniques used here and remember that many algorithms can be modeled in MapReduce. They may look very different from the traditional approach, but the goal is the correct output and not an implementation of a known algorithm.

Using language-independent data structures

A criticism often leveled at Hadoop, and which the community has been working hard to address, is that it is very Java-centric. It may appear strange to accuse a project fully implemented in Java of being Java-centric, but the consideration is from a client's perspective.

We have shown how Hadoop Streaming allows the use of scripting languages to implement map and reduce tasks and how Pipes provides similar mechanisms for C++. However, one area that does remain Java-only is the nature of the input formats supported by Hadoop MapReduce. The most efficient format is SequenceFile, a binary splittable container that supports compression. However, SequenceFiles have only a Java API; they cannot be written or read in any other language.

We could have an external process creating data to be ingested into Hadoop for MapReduce processing, and the best way we could do this is either have it simply as an output of text type or do some preprocessing to translate the output format into SequenceFiles to be pushed onto HDFS. We also struggle here to easily represent complex data types; we either have to flatten them to a text format or write a converter across two binary formats, neither of which is an attractive option.

Candidate technologies

Fortunately, there have been several technologies released in recent years that address the question of cross-language data representations. They are Protocol Buffers (created by Google and hosted at Thrift (originally created by Facebook and now an Apache project at and Avro (created by Doug Cutting, the original creator of Hadoop). Given its heritage and tight Hadoop integration, we will use Avro to explore this topic. We won't cover Thrift or Protocol Buffers in this book, but both are solid technologies; if the topic of data serialization interests you, check out their home pages for more information.

Introducing Avro

Avro, with its home page at a data-persistence framework with bindings for many programming languages. It creates a binary structured format that is both compressible and splittable, meaning it can be efficiently used as the input to MapReduce jobs.

Avro allows the definition of hierarchical data structures; so, for example, we can create a record that contains an array, an enumerated type, and a subrecord. We can create these files in any programming language, process them in Hadoop, and have the result read by a third language.

We'll talk about these aspects of language independence over the next sections, but this ability to express complex structured types is also very valuable. Even if we are using only Java, we could employ Avro to allow us to pass complex data structures in and out of mappers and reducers. Even things like graph nodes!

Time for action – getting and installing Avro

Let's download Avro and get it installed on our system.

  1. Download the latest stable version of Avro from http://avro.apache.org/releases.html.
  2. Download the latest version of the ParaNamer library from http://paranamer.codehaus.org.
  3. Add the classes to the build classpath used by the Java compiler.
    $ export CLASSPATH=avro-1.7.2.jar:${CLASSPATH}
    $ export CLASSPATH=avro-mapred-1.7.2.jar:${CLASSPATH}
    $ export CLASSPATH=paranamer-2.5.jar:${CLASSPATH
    
  4. Add existing JAR files from the Hadoop distribution to the build classpath.
    Export CLASSPATH=${HADOOP_HOME}/lib/Jackson-core-asl-1.8.jar:${CLASSPATH}
    Export CLASSPATH=${HADOOP_HOME}/lib/Jackson-mapred-asl-1.8.jar:${CLASSPATH}
    Export CLASSPATH=${HADOOP_HOME}/lib/commons-cli-1.2.jar:${CLASSPATH}
    
  5. Add the new JAR files to the Hadoop lib directory.
    $cpavro-1.7.2.jar ${HADOOP_HOME}/lib
    $cpavro-1.7.2.jar ${HADOOP_HOME}/lib
    $cpavro-mapred-1.7.2.jar ${HADOOP_HOME}/lib
    

What just happened?

Setting up Avro is a little involved; it is a much newer project than the other Apache tools we'll be using, so it requires more than a single download of a tarball.

We download the Avro and Avro-mapred JAR files from the Apache website. There is also a dependency on ParaNamer that we download from its home page at http://codehaus.org.

Note

The ParaNamer home page has a broken download link at the time of writing; as an alternative, try the following link:

http://search.maven.org/remotecontent?filepath=com/thoughtworks/paranamer/paranamer/2.5/paranamer-2.5.jar

After downloading these JAR files, we need to add them to the classpath used by our environment; primarily for the Java compiler. We add these files, but we also need to add to the build classpath several packages that ship with Hadoop because they are required to compile and run Avro code.

Finally, we copy the three new JAR files into the Hadoop lib directory on each host in the cluster to enable the classes to be available for the map and reduce tasks at runtime. We could distribute these JAR files through other mechanisms, but this is the most straightforward means.

Avro and schemas

One advantage Avro has over tools such as Thrift and Protocol Buffers, is the way it approaches the schema describing an Avro datafile. While the other tools always require the schema to be available as a distinct resource, Avro datafiles encode the schema in their header, which allows for the code to parse the files without ever seeing a separate schema file.

Avro supports but does not require code generation that produces code tailored to a specific data schema. This is an optimization that is valuable when possible but not a necessity.

We can therefore write a series of Avro examples that never actually use the datafile schema, but we'll only do that for parts of the process. In the following examples, we will define a schema that represents a cut-down version of the UFO sighting records we used previously.

Time for action – defining the schema

Let's now create this simplified UFO schema in a single Avro schema file.

Create the following as ufo.avsc:

{ "type": "record",
  "name": "UFO_Sighting_Record",
  "fields" : [
    {"name": "sighting_date", "type": "string"},
    {"name": "city", "type": "string"},
    {"name": "shape", "type": ["null", "string"]}, 
    {"name": "duration", "type": "float"}
] 
}

What just happened?

As can be seen, Avro uses JSON in its schemas, which are usually saved with the .avsc extension. We create here a schema for a format that has four fields, as follows:

  • The Sighting_date field of type string to hold a date of the form yyyy-mm-dd
  • The City field of type string that will contain the city's name where the sighting occurred
  • The Shape field, an optional field of type string, that represents the UFO's shape
  • The Duration field gives a representation of the sighting duration in fractional minutes

With the schema defined, we will now create some sample data.

Time for action – creating the source Avro data with Ruby

Let's create the sample data using Ruby to demonstrate the cross-language capabilities of Avro.

  1. Add the rubygems package:
    $ sudo apt-get install rubygems
    
  2. Install the Avro gem:
    $ gem install avro
    
  3. Create the following as generate.rb:
    require 'rubygems'
    require 'avro'
    
    file = File.open('sightings.avro', 'wb')
    schema = Avro::Schema.parse(
    File.open("ufo.avsc", "rb").read)
    
    writer = Avro::IO::DatumWriter.new(schema)
    dw = Avro::DataFile::Writer.new(file, writer, schema)
    dw<< {"sighting_date" => "2012-01-12", "city" => "Boston", "shape" => "diamond", "duration" => 3.5}
    dw<< {"sighting_date" => "2011-06-13", "city" => "London", "shape" => "light", "duration" => 13}
    dw<< {"sighting_date" => "1999-12-31", "city" => "New York", "shape" => "light", "duration" => 0.25}
    dw<< {"sighting_date" => "2001-08-23", "city" => "Las Vegas", "shape" => "cylinder", "duration" => 1.2}
    dw<< {"sighting_date" => "1975-11-09", "city" => "Miami", "duration" => 5}
    dw<< {"sighting_date" => "2003-02-27", "city" => "Paris", "shape" => "light", "duration" => 0.5}
    dw<< {"sighting_date" => "2007-04-12", "city" => "Dallas", "shape" => "diamond", "duration" => 3.5}
    dw<< {"sighting_date" => "2009-10-10", "city" => "Milan", "shape" => "formation", "duration" => 0}
    dw<< {"sighting_date" => "2012-04-10", "city" => "Amsterdam", "shape" => "blur", "duration" => 6}
    dw<< {"sighting_date" => "2006-06-15", "city" => "Minneapolis", "shape" => "saucer", "duration" => 0.25}
    dw.close
  4. Run the program and create the datafile:
    $ ruby generate.rb
    

What just happened?

Before we use Ruby, we ensure the rubygems package is installed on our Ubuntu host. We then install the preexisting Avro gem for Ruby. This provides the libraries we need to read and write Avro files from, within the Ruby language.

The Ruby script itself simply reads the previously created schema and creates a datafile with 10 test records. We then run the program to create the data.

This is not a Ruby tutorial, so I will leave analysis of the Ruby API as an exercise for the reader; its documentation can be found at http://rubygems.org/gems/avro.

Time for action – consuming the Avro data with Java

Now that we have some Avro data, let's write some Java code to consume it:

  1. Create the following as InputRead.java:
    import java.io.File;
    import java.io.IOException;
    
    import org.apache.avro.file.DataFileReader;
    import org.apache.avro.generic.GenericData;
    import org.apache.avro. generic.GenericDatumReader;
    import org.apache.avro.generic.GenericRecord;
    import org.apache.avro.io.DatumReader;
    
    public class InputRead
    {
        public static void main(String[] args) throws IOException
        {
            String filename = args[0] ;
    
            File file=new File(filename) ;
    DatumReader<GenericRecord> reader= new 
    GenericDatumReader<GenericRecord>();
    DataFileReader<GenericRecord>dataFileReader=new 
    DataFileReader<GenericRecord>(file,reader);
    
            while (dataFileReader.hasNext())
            {
    GenericRecord result=dataFileReader.next();
                String output = String.format("%s %s %s %f",
    result.get("sighting_date"), result.get("city"), 
    result.get("shape"), result.get("duration")) ;
    System.out.println(output) ;
            }
        }
    }
  2. Compile and run the program:
    $ javacInputRead.java
    $ java InputReadsightings.avro
    

    The output will be as shown in the following screenshot:

What just happened?

We created the Java class InputRead, which takes the filename passed as a command-line argument and parses this as an Avro datafile. When Avro reads from a datafile, each individual element is called a datum and each datum will follow the structure defined in the schema.

In this case, we don't use an explicit schema; instead, we read each datum into the GenericRecord class, and from this extract each field by explicitly retrieving it by name.

The GenericRecord class is a very flexible class in Avro; it can be used to wrap any record structure, such as our UFO-sighting type. Avro also supports primitive types such as integers, floats, and booleans as well as other structured types such as arrays and enums. In these examples, we'll use records as the most common structure, but this is only a convenience.

Using Avro within MapReduce

Avro's support for MapReduce revolves around several Avro-specific variants of other familiar classes, whereas we'd normally expect a new datafile format to be supported in Hadoop through new InputFormat and OutputFormat classes, we'll use AvroJob , AvroMapper , and AvroReducer instead of the non-Avro versions. AvroJob expects Avro datafiles as its input and output, so instead of specifying input and output format types, we configure it with details of the input and output Avro schemas.

The main difference for our mapper and reducer implementations are the types used. Avro, by default, has a single input and output, whereas we're used to our Mapper and Reducer classes having a key/value input and a key/value output. Avro also introduces the Pair class, which is often used to emit intermediate key/value data.

Avro does also support AvroKey and AvroValue , which can wrap other types, but we'll not use those in the following examples.

Time for action – generating shape summaries in MapReduce

In this section we will write a mapper that takes as input the UFO sighting record we defined earlier. It will output the shape and a count of 1, and the reducer will take this shape and count records and produce a new structured Avro datafile type containing the final counts for each UFO shape. Perform the following steps:

  1. Copy the sightings.avro file to HDFS.
    $ hadoopfs -mkdiravroin
    $ hadoopfs -put sightings.avroavroin/sightings.avro
    
  2. Create the following as AvroMR.java:
    import java.io.IOException;
    import org.apache.avro.Schema;
    import org.apache.avro.generic.*;
    import org.apache.avro.Schema.Type;
    import org.apache.avro.mapred.*;
    import org.apache.avro.reflect.ReflectData;
    import org.apache.avro.util.Utf8;
    import org.apache.hadoop.conf.*;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.mapred.*;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.io.* ;
    import org.apache.hadoop.util.*;
    
    // Output record definition
    class UFORecord
    {
    UFORecord()
        {
        }
    
        public String shape ;
        public long count ;
    }
    
    public class AvroMR extends Configured  implements Tool
    {
    // Create schema for map output
        public static final Schema PAIR_SCHEMA =         
    Pair.getPairSchema(Schema.create(Schema.Type.STRING), 
    Schema.create(Schema.Type.LONG));
    // Create schema for reduce output
        public final static Schema OUTPUT_SCHEMA = 
    ReflectData.get().getSchema(UFORecord.class);
    
        @Override
        public int run(String[] args) throws Exception
        {
    JobConfconf = new JobConf(getConf(), getClass());
    conf.setJobName("UFO count");
    
            String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
            if (otherArgs.length != 2)
            {
    System.err.println("Usage: avro UFO counter <in><out>");
    System.exit(2);
    
            }
    
    FileInputFormat.addInputPath(conf, new Path(otherArgs[0]));
            Path outputPath = new Path(otherArgs[1]);
    FileOutputFormat.setOutputPath(conf, outputPath);
    outputPath.getFileSystem(conf).delete(outputPath);
            Schema input_schema = 
    Schema.parse(getClass().getResourceAsStream("ufo.avsc"));
    AvroJob.setInputSchema(conf, input_schema);
    AvroJob.setMapOutputSchema(conf,           
    Pair.getPairSchema(Schema.create(Schema.Type.STRING), 
    Schema.create(Schema.Type.LONG)));
    
    AvroJob.setOutputSchema(conf, OUTPUT_SCHEMA);
    AvroJob.setMapperClass(conf, AvroRecordMapper.class);
    AvroJob.setReducerClass(conf, AvroRecordReducer.class);
    conf.setInputFormat(AvroInputFormat.class) ;
    JobClient.runJob(conf);
    
            return 0 ;
        }
    
        public static class AvroRecordMapper extends 
    AvroMapper<GenericRecord, Pair<Utf8, Long>>
        {
            @Override
            public void map(GenericRecord in, AvroCollector<Pair<Utf8, 
    Long>> collector, Reporter reporter) throws IOException
            {
                Pair<Utf8,Long> p = new Pair<Utf8,Long>(PAIR_SCHEMA) ;
    Utf8 shape = (Utf8)in.get("shape") ;
                if (shape != null)
                {
    p.set(shape, 1L) ;
    collector.collect(p);
                }
            }
        }
    
        public static class AvroRecordReducer extends AvroReducer<Utf8, 
    Long, GenericRecord>
        {
            public void reduce(Utf8 key, Iterable<Long> values, 
    AvroCollector<GenericRecord> collector,  
                Reporter reporter) throws IOException
            {
                long sum = 0;
                for (Long val : values)
                {
                    sum += val;
                }
    
    GenericRecord value = new 
    GenericData.Record(OUTPUT_SCHEMA);
    value.put("shape", key);
    value.put("count", sum);
    
    collector.collect(value);
            }
        }
    
        public static void main(String[] args) throws Exception
        {
    int res = ToolRunner.run(new Configuration(), new AvroMR(),
    args);
    System.exit(res);
        } 
    }
  3. Compile and run the job:
    $ javacAvroMR.java
    $ jar -cvfavroufo.jar *.class ufo.avsc
     $ hadoop jar ~/classes/avroufo.jarAvroMRavroinavroout
    
  4. Examine the output directory:
    $ hadoopfs -lsavroout
    Found 3 items
    -rw-r--r-- 1 … /user/hadoop/avroout/_SUCCESS
    drwxr-xr-x - hadoopsupergroup 0 … /user/hadoop/avroout/_logs
    -rw-r--r-- 1 … /user/hadoop/avroout/part-00000.avro
    
  5. Copy the output file to the local filesystem:
    $ hadoopfs -get /user/hadoop/avroout/part-00000.avroresult.avro
    

What just happened?

We created the Job class and examined its various components. The actual logic within the Mapper and Reducer classes is relatively straightforward: the Mapper class just extracts the shape column and emits it with a count of 1; the reducer then counts the total number of entries for each shape. The interesting aspects are around the defined input and output types to the Mapper and Reducer classes and how the job is configured.

The Mapper class has an input type of GenericRecord and an output type of Pair. The Reducer class has a corresponding input type of Pair and output type of GenericRecord.

The GenericRecord class passed to the Mapper class wraps a datum that is the UFO sighting record represented in the input file. This is how the Mapper class is able to retrieve the Shape field by name.

Recall that GenericRecords may or may not be explicitly created with a schema, and in either case the structure can be determined from the datafile. For the GenericRecord output by the Reducer class, we do pass a schema but use a new mechanism for its creation.

Within the previously mentioned code, we created the additional UFORecord class and used Avro reflection to generate its schema dynamically at runtime. We were then able to use this schema to create a GenericRecord class specialized to wrap that particular record type.

Between the Mapper and Reducer classes we use the Avro Pair type to hold a key and value pair. This allows us to express the same logic for the Mapper and Reducer classes that we used in the original WordCount example back in Chapter 2, Getting Hadoop Up and Running; the Mapper class emits singleton counts for each value and the reducer sums these into an overall total for each shape.

In addition to the Mapper and Reducer classes' input and output, there is some configuration unique to a job processing Avro data:

Schema input_schema = Schema.parse(getClass().getResourceAsStream("ufo.avsc")) ;
AvroJob.setInputSchema(conf, input_schema);
AvroJob.setMapOutputSchema(conf,           Pair.getPairSchema(Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.LONG)));

AvroJob.setOutputSchema(conf, OUTPUT_SCHEMA);
AvroJob.setMapperClass(conf, AvroRecordMapper.class);
AvroJob.setReducerClass(conf, AvroRecordReducer.class);

These configuration elements demonstrate the criticality of schema definition to Avro; though we can do without it, we must set the expected input and output schema types. Avro will validate the input and output against the specified schemas, so there is a degree of data type safety. For the other elements, such as setting up the Mapper and Reducer classes, we simply set those on AvroJob instead of the more generic classes, and once done, the MapReduce framework will perform appropriately.

This example is also the first time we've explicitly implemented the Tool interface. When running the Hadoop command-line program, there are a series of arguments (such as -D) that are common across all the multiple subcommands. If a job class implements the Tool interface as mentioned in the previous section, it automatically gets access to any of these standard options passed on the command line. It's a useful mechanism that prevents lots of code duplication.

Time for action – examining the output data with Ruby

Now that we have the output data from the job, let's examine it again using Ruby.

  1. Create the following as read.rb:
    require 'rubygems'
    require 'avro'
    
    file = File.open('res.avro', 'rb')
    reader = Avro::IO::DatumReader.new()
    dr = Avro::DataFile::Reader.new(file, reader)
    
    dr.each {|record|  
    print record["shape"]," ",record["count"],"\n"
    }
    dr.close
  2. Examine the created result file.
    $ ruby read.rb
    blur 1
    cylinder 1
    diamond 2
    formation 1
    light 3
    saucer 1
    

What just happened?

As before, we'll not analyze the Ruby Avro API. The example created a Ruby script that opens an Avro datafile, iterates through each datum, and displays it based on explicitly named fields. Note that the script does not have access to the schema for the datafile; the information in the header provides enough data to allow each field to be retrieved.

Time for action – examining the output data with Java

To show that the data is accessible from multiple languages, let's also display the job output using Java.

  1. Create the following as OutputRead.java:
    import java.io.File;
    import java.io.IOException;
    
    import org.apache.avro.file.DataFileReader;
    import org.apache.avro.generic.GenericData;
    import org.apache.avro. generic.GenericDatumReader;
    import org.apache.avro.generic.GenericRecord;
    import org.apache.avro.io.DatumReader;
    
    public class OutputRead
    {
        public static void main(String[] args) throws IOException
        {
            String filename = args[0] ;
    
            File file=new File(filename) ;
    DatumReader<GenericRecord> reader= new 
    GenericDatumReader<GenericRecord>();
    DataFileReader<GenericRecord>dataFileReader=new 
    DataFileReader<GenericRecord>(file,reader);
    
            while (dataFileReader.hasNext())
            {
    GenericRecord result=dataFileReader.next();
                String output = String.format("%s %d",
    result.get("shape"), result.get("count")) ;
    System.out.println(output) ;
            }
        }
    }
  2. Compile and run the program:
    $ javacOutputResult.java
    $ java OutputResultresult.avro
    blur 1
    cylinder 1
    diamond 2
    formation 1
    light 3
    saucer 1
    

What just happened?

We added this example to show the Avro data being read by more than one language. The code is very similar to the earlier InputRead class; the only difference is that the named fields are used to display each datum as it is read from the datafile.

Have a go hero – graphs in Avro

As previously mentioned, we worked hard to reduce representation-related complexity in our GraphPath class. But with mappings to and from flat lines of text and objects, there was an overhead in managing these transformations.

With its support for nested complex types, Avro can natively support a representation of a node that is much closer to the runtime object. Modify the GraphPath class job to read and write the graph representation to an Avro datafile comprising of datums for each node. The following example schema may be a good starting point, but feel free to enhance it:

{ "type": "record",
  "name": "Graph_representation",
  "fields" : [
{"name": "node_id", "type": "int"},
    {"name": "neighbors", "type": "array", "items:"int" },
    {"name": "distance", "type": "int"},
  {"name": "status", "type": "enum", 
"symbols": ["PENDING", "CURRENT", "DONE"
},]
] 
}

Going forward with Avro

There are many features of Avro we did not cover in this case study. We focused only on its value as an at-rest data representation. It can also be used within a remote procedure call (RPC) framework and can optionally be used as the default RPC format in Hadoop 2.0. We didn't use Avro's code generation facilities that produce a much more domain-focused API. Nor did we cover issues such as Avro's ability to support schema evolution that, for example, allows new fields to be added to recent records without invalidating old datums or breaking existing clients. It's a technology you are very likely to see more of in the future.

Summary

This chapter has used three case studies to highlight some more advanced aspects of Hadoop and its broader ecosystem. In particular, we covered the nature of join-type problems and where they are seen, how reduce-side joins can be implemented with relative ease but with an efficiency penalty, and how to use optimizations to avoid full joins in the map-side by pushing data into the Distributed Cache.

We then learned how full map-side joins can be implemented, but require significant input data processing; how other tools such as Hive and Pig should be investigated if joins are a frequently encountered use case; and how to think about complex types like graphs and how they can be represented in a way that can be used in MapReduce.

We also saw techniques for breaking graph algorithms into multistage MapReduce jobs, the importance of language-independent data types, how Avro can be used for both language independence as well as complex Java-consumed types, and the Avro extensions to the MapReduce APIs that allow structured types to be used as the input and output to MapReduce jobs.

This now concludes our coverage of the programmatic aspects of the Hadoop MapReduce framework. We will now move on in the next two chapters to explore how to manage and scale a Hadoop environment.