5.3 示例二:采用新API的WordCount
示例一采用的是老API,现在已经很少用了。Hadoop从1.0版之后就把重点转到了新API,现在只是为了保持兼容性才保留了老API。所以,现在Hadoop的代码中已经难以找到采用老API的示例了,而采用新API的示例则有很多。本节采用的示例是个比较典型的MapReduce应用WordCount,用来统计目标文件中的词频,就是不同的单词各出现了多少次,其源码文件在hadoop-mapreduce-proj ect/hadoop-mapreduce-examples/src目录下面,相对路径名为main/java/org/apache/hadoop/examples/WordCount.java。
作为一个可(在Java虚拟机JVM上)独立执行的“可执行程序”, WordCount也是个带有main()方法的Java类。我们先看一下它的摘要:
class WordCount{} ]class TokenizerMapper extendsMapper<Object, Text, Text, IntWritable>{} ]]map(Obj ect key, Text value, Context context) ]class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable>{} ]]reduce(Text key, Iterable<IntWritable> values, Context context) ]main(String[]args)
这可以说是个最小、最简单,却又很典型的MapReduce应用示例。WordCount这个类中内嵌定义了两个类:TokenizerMapper和IntSumReducer。前者是对Mapper类的扩充,所以实质上就是一个Mapper,但是其中的map()方法是自己提供的;后者则是对Reducer类的扩充,所以本质上是个Reducer,但是其中的reduce()方法是自己提供的。这就是用户程序员要提交给Hadoop的自备Mapper和Reducer。除这二者之外,就只有一个main()函数了。
注意,TokenizerMapper所继承和扩充的类是Mapper,那是Hadoop提供的默认Mapper。如果你不提供Mapper,那么Hadoop就给你用上它所提供的默认Mapper。Mapper这个类是以模板的形式定义的:
class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>{}
就是说在扩充或直接采用这个类的时候应该把抽象参数类型KEYIN、VALUEIN等根据实际需要落实为具体的类型。这里KEYIN和VALUEIN是Mapper的输入KV对(Key/Value对)的类型;KEYOUT和VALUEOUT则是其输出KV对的类型。这样,编译以后可以因具体的输入/输出KV对类型而生成各种不同的Mapper,这些Mapper的参数类型各不相同,但是程序的结构和流程都一样。在这里的WordCount中,KEYIN的类型落实为Object,实际上就是什么类型都可以(除无结构的原始类型如Int等之外),因为在Java语言中所有的类都是从Obj ect直接或间接继承扩充而来的;而VALUEIN则落实为Text。这个意思是:对于Mapper的输入KV对,任何类型都可以定义为Key的类型,但是Value的类型则一定是Text。
Mapper是这样,Reducer也是一样的道理,只不过Reducer的输入KV对的类型是Text/IntWritable。
由于本章的重点在于作业的提交过程,我们先把WordCount的Mapper和Reducer放一下,留待后面讲解Hadoop的MapReduce框架时再回头细究。现在我们把目光聚焦在它的main()方法。
[WordCount.main()] public static void main(String[]args)throws Exception { Configuration conf=new Configuration(); String[]otherArgs=new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2){ System.err.println("Usage:wordcount <in> [<in>…]<out>"); System.exit(2); } Job j ob=new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); for (int i=0; i < otherArgs.length-1; ++i){ FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length-1])); System.exit(job.waitForCompletion(true)?0 :1); //提交作业并等待其结束运行 }
显然,除没有使用Combiner之外,这里做的事情跟示例一中所做的很相似,也是先准备好一份类似于作业单那样的东西,然后就把所需的材料,包括Mapper和Reducer,随同作业单一起提交给Hadoop。所不同的是,在采用老API的示例一中,作业单是个JobConf类的对象,提交作业时就调用JobClient.runJob()。而现在,在新API中,所用的作业单改成了一个Job类对象,要提交作业时就调用其waitForCompletion()方法,这个调用要到作业结束运行后才返回。这里的最后一行代码表示:对于Job.waitForCompletion()的调用,如果返回true就以0为参数调用System.exit(),以退出程序运行并让操作系统上的Shell知道这是正常结束;否则就以1为参数,让Shell知道运行失败。这是规矩,对于Shell脚本的使用很重要。
从表面上看,新老API之间的差别似乎微不足道,但如果深入了解就发现不是那么回事了。前面讲过,TokenizerMapper继承和扩充了Mapper类,所以本质上就是一个Mapper;IntSumReducer则继承和扩充了Reducer类。因为Mapper本来就是个类,就可以直接被用作默认的Mapper; Reducer也是一样。可是如果我们仔细看一下示例一中的ValueAggregatorMapper和ValueAggregatorReducer,则可以发现这二者都是继承和扩展了ValueAggregatorJobBase,而后者则实现了Mapper和Reducer两个界面。也就是说,在老API中Mapper和Reducer不是类而只是界面。这就是一个明显的不同,当然还有别的不同。
不过,如前所述,示例一中采用老API的作业提交,结果也走到了新API的Job类这一边,汇聚到Job.submit()。但是早期的Hadoop不是这样,这是有了新API以后才把老API嫁接过来的。因为这属于API的内部实现,所以这样的改变并不影响用户。
正因为老API已被嫁接到了新API上,我们在示例一中已经看到过Job类的一部分摘要,但是那时候我们还不关心waitForCompletion()和与此有关的内容,现在需要看看这个类的更详细的内容摘要了。
class Job extends JobContextImpl implements JobContext {} ]public static enum JobState {DEFINE, RUNNING} ]static final longMAX_JOBSTATUS_AGE=1000 * 2 ]static final String OUTPUT_FILTER="mapreduce.client.output.filter" ]static final String COMPLETION_POLL_INTERVAL_KEY= "mapreduce.client.completion.pollinterval" ]… ]static final int DEFAULT_MONITOR_POLL_INTERVAL=1000 ]static final int DEFAULT_TASKLOG_TIMEOUT=60000 ]… ]boolean waitForCompletion(boolean verbose) ]submit() ]setSpeculativeExecution() ]setInputFormatClass() ]setOutputFormatClass() ]setOutputKeyClass() ]setOutputValueClass() ]setMapOutputKeyClass()
]setMapOutputValueClass() ]setMapperClass() ]setCombinerClass() ]setReducerClass() ]getConfiguration() ]monitorAndPrintJob() ]printTaskEvents(TaskCompletionEvent[]events, …)
首先,在Job这个类中有不少静态的数据和类型定义。比方说这里定义了一个枚举类型JobState,其数值可以是DEFINE或RUNNING,分别表示作业当前是处于定义/准备阶段还是运行阶段。而常数MAX_JOBSTATUS_AGE设置成2000,表示最多2000毫秒就要刷新一下这个作业的状态。字符串常数OUTPUT_FILTER表明MapReduce的输出是否还需要有一层过滤,如果需要的话是什么过滤器,这是要以“mapreduce.client.output.filter”为键名去xml配置文件中查询的。
我们最关心的当然是方法waitForCompletion()的实现,下面是这个方法的代码。
[WordCount.main()> Job.waitForCompletion()] public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException, ClassNotFoundException { if (state==JobState.DEFINE){ //确保该作业只提交一次,提交后即变成RUNNING submit(); //通过Job.submit()走完作业提交的流程 } //提交之后就监视其运行,直至结束 if (verbose){ monitorAndPrintJob(); //饶舌模式,周期性地报告作业的进展 }else { //要不然,就周期性地询问作业是否已完成 //get the completion poll interval from the client. int completionPollIntervalMillis=Job.getCompletionPollInterval(cluster.getConf()); while (! isComplete()){ //询问作业是否已经完成 try{ Thread.sleep(completionPollIntervalMillis); //尚未完成,睡一会儿 }catch(InterruptedException ie){ //如果因中断而被唤醒,就继续下一轮循环 } } //end while } return isSuccessful(); }
这个方法,在其执行过程中有三种发生异常的可能,即IOException、InterruptedException、ClassNotFoundException。因为作业最终得要提交到中央节点,这就涉及节点之间的通信,如果通信出了问题而且不能恢复,那么作业就无法提交,更谈不上执行了,这时候JVM就会对所在的进程发起一次IOException异常,让其退出运行。另一方面,如果在执行过程中被别的线程或JVM中断,那也无法继续执行了。但是,这里有个例外,就是在下面while循环中的try块在睡眠期间发生中断对作业的执行没有什么影响。因为此时当前进程已经完成作业提交,只需过一会儿去了解一下情况,作业的实际执行已经不在这个进程上,即使发生中断也不影响作业的执行,所以这里要拦截捕捉(catch)中断异常并将其丢弃(空语句),不让它落入覆盖着整个方法的异常处理机制。最后,ClassNotFoundException发生于运行时要动态引用某个类的对象但是却找不到的情况下,那当然也无法再往下执行了。
从作业提交流程的角度看,这个方法的代码再简单不过了,实际就是对Job.submit()的调用,只是在调用之前要检查一下本作业是否处于DEFINE状态,以确保一个作业不会被提交多次。如上所述,JobState的值只有DEFINE和RUNNING两种,具体Job对象创建之初在构造函数Job()中将其设置成DEFINE,作业提交成功之后就将其改成RUNNING,这就把门关上了。
在正常的情况下,Job.submit()很快就会返回,因为这个方法的作用只是把作业提交上去,而无须等待作业的执行和完成。但是,在Job.submit()返回之后,Job.waitForCompletion()则要等待作业执行完成了以后才会返回。在等待期间,如果参数verbose为true,就要周期地报告作业执行的进展,或者就只是周期地检测作业是否已经完成。
读者也许会问,代码中对submit()并不检查其返回值,那万一在进一步提交作业的过程中出错呢?其实这就是前述三种异常处理的作用所在,因为要出问题也无非就是通信出错、找不到需要动态引用的类以及被强迫中断这么几种,执行中没有发生异常就是成功。
下面就是Job.submit()的事了,这就跟前面通过老API界面JobClient.runJob()提交作业的流程汇合了。