大数据处理系统:Hadoop源代码情景分析
上QQ阅读APP看书,第一时间看更新

5.3 示例二:采用新API的WordCount

示例一采用的是老API,现在已经很少用了。Hadoop从1.0版之后就把重点转到了新API,现在只是为了保持兼容性才保留了老API。所以,现在Hadoop的代码中已经难以找到采用老API的示例了,而采用新API的示例则有很多。本节采用的示例是个比较典型的MapReduce应用WordCount,用来统计目标文件中的词频,就是不同的单词各出现了多少次,其源码文件在hadoop-mapreduce-proj ect/hadoop-mapreduce-examples/src目录下面,相对路径名为main/java/org/apache/hadoop/examples/WordCount.java。

作为一个可(在Java虚拟机JVM上)独立执行的“可执行程序”, WordCount也是个带有main()方法的Java类。我们先看一下它的摘要:

      class WordCount{}
      ]class TokenizerMapper extendsMapper<Object, Text, Text, IntWritable>{}
      ]]map(Obj ect key, Text value, Context context)
      ]class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable>{}
      ]]reduce(Text key, Iterable<IntWritable> values, Context context)
      ]main(String[]args)

这可以说是个最小、最简单,却又很典型的MapReduce应用示例。WordCount这个类中内嵌定义了两个类:TokenizerMapper和IntSumReducer。前者是对Mapper类的扩充,所以实质上就是一个Mapper,但是其中的map()方法是自己提供的;后者则是对Reducer类的扩充,所以本质上是个Reducer,但是其中的reduce()方法是自己提供的。这就是用户程序员要提交给Hadoop的自备Mapper和Reducer。除这二者之外,就只有一个main()函数了。

注意,TokenizerMapper所继承和扩充的类是Mapper,那是Hadoop提供的默认Mapper。如果你不提供Mapper,那么Hadoop就给你用上它所提供的默认Mapper。Mapper这个类是以模板的形式定义的:

      class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>{}

就是说在扩充或直接采用这个类的时候应该把抽象参数类型KEYIN、VALUEIN等根据实际需要落实为具体的类型。这里KEYIN和VALUEIN是Mapper的输入KV对(Key/Value对)的类型;KEYOUT和VALUEOUT则是其输出KV对的类型。这样,编译以后可以因具体的输入/输出KV对类型而生成各种不同的Mapper,这些Mapper的参数类型各不相同,但是程序的结构和流程都一样。在这里的WordCount中,KEYIN的类型落实为Object,实际上就是什么类型都可以(除无结构的原始类型如Int等之外),因为在Java语言中所有的类都是从Obj ect直接或间接继承扩充而来的;而VALUEIN则落实为Text。这个意思是:对于Mapper的输入KV对,任何类型都可以定义为Key的类型,但是Value的类型则一定是Text。

Mapper是这样,Reducer也是一样的道理,只不过Reducer的输入KV对的类型是Text/IntWritable。

由于本章的重点在于作业的提交过程,我们先把WordCount的Mapper和Reducer放一下,留待后面讲解Hadoop的MapReduce框架时再回头细究。现在我们把目光聚焦在它的main()方法。

      [WordCount.main()]


        public static void main(String[]args)throws Exception {
          Configuration conf=new Configuration();
          String[]otherArgs=new GenericOptionsParser(conf, args).getRemainingArgs();
          if (otherArgs.length < 2){
            System.err.println("Usage:wordcount <in> [<in>…]<out>");
            System.exit(2);
          }
          Job j ob=new Job(conf, "word count");
          job.setJarByClass(WordCount.class);
          job.setMapperClass(TokenizerMapper.class);
          job.setCombinerClass(IntSumReducer.class);
          job.setReducerClass(IntSumReducer.class);
          job.setOutputKeyClass(Text.class);
          job.setOutputValueClass(IntWritable.class);
          for (int i=0; i < otherArgs.length-1; ++i){
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
          }
          FileOutputFormat.setOutputPath(job,
            new Path(otherArgs[otherArgs.length-1]));
          System.exit(job.waitForCompletion(true)?0 :1); //提交作业并等待其结束运行
      }

显然,除没有使用Combiner之外,这里做的事情跟示例一中所做的很相似,也是先准备好一份类似于作业单那样的东西,然后就把所需的材料,包括Mapper和Reducer,随同作业单一起提交给Hadoop。所不同的是,在采用老API的示例一中,作业单是个JobConf类的对象,提交作业时就调用JobClient.runJob()。而现在,在新API中,所用的作业单改成了一个Job类对象,要提交作业时就调用其waitForCompletion()方法,这个调用要到作业结束运行后才返回。这里的最后一行代码表示:对于Job.waitForCompletion()的调用,如果返回true就以0为参数调用System.exit(),以退出程序运行并让操作系统上的Shell知道这是正常结束;否则就以1为参数,让Shell知道运行失败。这是规矩,对于Shell脚本的使用很重要。

从表面上看,新老API之间的差别似乎微不足道,但如果深入了解就发现不是那么回事了。前面讲过,TokenizerMapper继承和扩充了Mapper类,所以本质上就是一个Mapper;IntSumReducer则继承和扩充了Reducer类。因为Mapper本来就是个类,就可以直接被用作默认的Mapper; Reducer也是一样。可是如果我们仔细看一下示例一中的ValueAggregatorMapper和ValueAggregatorReducer,则可以发现这二者都是继承和扩展了ValueAggregatorJobBase,而后者则实现了Mapper和Reducer两个界面。也就是说,在老API中Mapper和Reducer不是类而只是界面。这就是一个明显的不同,当然还有别的不同。

不过,如前所述,示例一中采用老API的作业提交,结果也走到了新API的Job类这一边,汇聚到Job.submit()。但是早期的Hadoop不是这样,这是有了新API以后才把老API嫁接过来的。因为这属于API的内部实现,所以这样的改变并不影响用户。

正因为老API已被嫁接到了新API上,我们在示例一中已经看到过Job类的一部分摘要,但是那时候我们还不关心waitForCompletion()和与此有关的内容,现在需要看看这个类的更详细的内容摘要了。

      class Job extends JobContextImpl implements JobContext {}
      ]public static enum JobState {DEFINE, RUNNING}
      ]static final longMAX_JOBSTATUS_AGE=1000 * 2
      ]static final String OUTPUT_FILTER="mapreduce.client.output.filter"
      ]static final String COMPLETION_POLL_INTERVAL_KEY=
                                            "mapreduce.client.completion.pollinterval"
      ]…
      ]static final int DEFAULT_MONITOR_POLL_INTERVAL=1000
      ]static final int DEFAULT_TASKLOG_TIMEOUT=60000
      ]…
      ]boolean waitForCompletion(boolean verbose)
      ]submit()
      ]setSpeculativeExecution()
      ]setInputFormatClass()
      ]setOutputFormatClass()
      ]setOutputKeyClass()
      ]setOutputValueClass()
      ]setMapOutputKeyClass()
      ]setMapOutputValueClass()
      ]setMapperClass()
      ]setCombinerClass()
      ]setReducerClass()
      ]getConfiguration()
      ]monitorAndPrintJob()
      ]printTaskEvents(TaskCompletionEvent[]events, …)

首先,在Job这个类中有不少静态的数据和类型定义。比方说这里定义了一个枚举类型JobState,其数值可以是DEFINE或RUNNING,分别表示作业当前是处于定义/准备阶段还是运行阶段。而常数MAX_JOBSTATUS_AGE设置成2000,表示最多2000毫秒就要刷新一下这个作业的状态。字符串常数OUTPUT_FILTER表明MapReduce的输出是否还需要有一层过滤,如果需要的话是什么过滤器,这是要以“mapreduce.client.output.filter”为键名去xml配置文件中查询的。

我们最关心的当然是方法waitForCompletion()的实现,下面是这个方法的代码。

      [WordCount.main()> Job.waitForCompletion()]


      public boolean waitForCompletion(boolean verbose)
                        throws IOException, InterruptedException, ClassNotFoundException {
        if (state==JobState.DEFINE){ //确保该作业只提交一次,提交后即变成RUNNING
          submit();  //通过Job.submit()走完作业提交的流程
        }
          //提交之后就监视其运行,直至结束
        if (verbose){
          monitorAndPrintJob(); //饶舌模式,周期性地报告作业的进展
        }else {             //要不然,就周期性地询问作业是否已完成
          //get the completion poll interval from the client.
          int completionPollIntervalMillis=Job.getCompletionPollInterval(cluster.getConf());
          while (! isComplete()){ //询问作业是否已经完成
            try{
              Thread.sleep(completionPollIntervalMillis); //尚未完成,睡一会儿
            }catch(InterruptedException ie){ //如果因中断而被唤醒,就继续下一轮循环
            }
          } //end while
        }
        return isSuccessful();
      }

这个方法,在其执行过程中有三种发生异常的可能,即IOException、InterruptedException、ClassNotFoundException。因为作业最终得要提交到中央节点,这就涉及节点之间的通信,如果通信出了问题而且不能恢复,那么作业就无法提交,更谈不上执行了,这时候JVM就会对所在的进程发起一次IOException异常,让其退出运行。另一方面,如果在执行过程中被别的线程或JVM中断,那也无法继续执行了。但是,这里有个例外,就是在下面while循环中的try块在睡眠期间发生中断对作业的执行没有什么影响。因为此时当前进程已经完成作业提交,只需过一会儿去了解一下情况,作业的实际执行已经不在这个进程上,即使发生中断也不影响作业的执行,所以这里要拦截捕捉(catch)中断异常并将其丢弃(空语句),不让它落入覆盖着整个方法的异常处理机制。最后,ClassNotFoundException发生于运行时要动态引用某个类的对象但是却找不到的情况下,那当然也无法再往下执行了。

从作业提交流程的角度看,这个方法的代码再简单不过了,实际就是对Job.submit()的调用,只是在调用之前要检查一下本作业是否处于DEFINE状态,以确保一个作业不会被提交多次。如上所述,JobState的值只有DEFINE和RUNNING两种,具体Job对象创建之初在构造函数Job()中将其设置成DEFINE,作业提交成功之后就将其改成RUNNING,这就把门关上了。

在正常的情况下,Job.submit()很快就会返回,因为这个方法的作用只是把作业提交上去,而无须等待作业的执行和完成。但是,在Job.submit()返回之后,Job.waitForCompletion()则要等待作业执行完成了以后才会返回。在等待期间,如果参数verbose为true,就要周期地报告作业执行的进展,或者就只是周期地检测作业是否已经完成。

读者也许会问,代码中对submit()并不检查其返回值,那万一在进一步提交作业的过程中出错呢?其实这就是前述三种异常处理的作用所在,因为要出问题也无非就是通信出错、找不到需要动态引用的类以及被强迫中断这么几种,执行中没有发生异常就是成功。

下面就是Job.submit()的事了,这就跟前面通过老API界面JobClient.runJob()提交作业的流程汇合了。