2.5 Google的成功之道——MapReduce算法
MapReduce算法是处理/产生海量数据集的编程模型。在MapReduce算法中,用户指定一个map()函数,通过这个map()函数处理键/值(key/value)对,产生一系列的中间键/值(key/value)对,并且使用一个reduce()函数来合并具有相同键(key)的中间键值(key/value)对中的值(value)。现实生活中,很多任务的实现都是基于这个模式的。
使用MapReduce算法的程序可以将任务自动分布到一个由普通机器组成的超大规模集群上并发执行。大多数实现MapReduce算法的框架(比如Hadoop)都会解决输入数据的分布细节,跨越机器集群的程序执行调度,处理机器的失效和机器之间的通信等问题。使用框架来实现MapReduce算法,程序员不需要有并发处理或者分布式系统的经验,就可以处理超大规模的分布式系统的资源。
考虑这样一个例子,在很大的文档集合中统计每个单词出现的次数。写出如下类似的伪代码:
map(String key, String value): // key: 文档名 // value:文档内容 for each word w in value: EmitIntermediate(w, "1"); reduce(String key, Iterator values): // key: a word // values: 一个针对这个word的计数列表(每个列表记录文档中word出现的次数) int result = 0; for each v in values: result += ParseInt(v); Emit(AsString(result));
Map()函数检查每个单词,并且每检查一次就将当前检查的单词的计数器加1。reduce()函数将特定单词出现的次数进行合并。
上面的例子是用字符串作为输入和输出的,从概念上讲,map()和reduce()函数的输入/输出类型遵循以下原则。
map (k1,v1) → list(k2,v2) reduce (k2,list(v2)) → list(v2)
也就是说,输入的键(key)值和输出的键(key)值属于不同的域,而中间的键值和输出的键值属于相同的域。
MapReduce算法在实际的系统中使用非常广泛,下面是一些简单有趣的例子,它们都可以使用MapReduce算法来进行计算。
● 分布式Grep:如果map()函数检查输入行满足条件,则map()函数就把本行输出。reduce()函数就是一个直通函数,简单地把中间数据输出就可以了。
● URL访问频率统计:map()函数处理请求和应答(URL,1)的log。reduce()函数把所有相同的URL的值合并,并且输出一个成对的(URL,总个数)。
● 逆向Web-Link图:map()函数输出所有包含指向目标URL的网页,用(目标URL,源URL)这样的结构对输出。reduce()函数聚合所有关联相同目标URL的列表、源URL并且输出一个(目标URL,list(源URL))的结构。
● 主机关键向量指标(Term-Vector per Hosts):关键词向量指标简而言之就是指在一个文档或者一组文档中的重点词出现的频率,用(word,frequency)表达。map()函数计算每一个输入文档(主机名字是从文档的URL取出)的关键词向量,然后输出(hostname,关键词向量(Term-Vector))。reduce()函数处理所有相同主机的所有文档关键词向量,去掉不常用的关键词,并且输出最终的(hostname,关键词向量)对。
● 逆序索引:map()函数分析每一个文档,并且产生一个序列(word,documentID)组。reduce()函数处理指定word的所有序列组,并且对相关的document ID进行排序,输出一个(word,list(document ID))组。所有的输出组,组成一个简单的逆序索引。通过这种方法可以很容易地保持关键词在文档库中的位置。
● 分布式排序:map()函数从每条记录中抽取关键字,并且产生(key,record)对。reduce()函数原样输出所有的关键字对。
2.5.1 详解MapReduce算法
map()函数把输入数据进行切割(比如分为M块)之后,分布到不同的机器上执行(例如前面介绍的单词统计例子,可以把每一个文件分配到一台机器上执行)。reduce()函数通过产生的键key(例如可以根据某种分区函数——比如hash(key)mod R),R的值和分区函数都是由用户指定)将map()的结果集分成R块,然后分别在R台机器上执行。
图2.15是MapReduce算法示意图。当用户程序调用MapReduce函数时,就会引起如下的操作。
MapReduce函数库首先把输入文件分成M块,每块大概16~64MB。接着在集群的机器上执行处理程序。
如图2.15所示,MapReduce算法运行过程中有一个主控程序,称为Master。主控程序会产生很多作业程序,称为worker,并且把M个map任务和R个reduce任务分配给这些worker,让它们去完成。
图2.15 MapReduce运行机制
被分配了map任务的worker读取并处理相关的输入(这里的输入是指已经被切割的输入小块splite)。它处理输入的数据,并且将分析出的键/值(key/value)对传递给用户定义的reduce()函数。map()函数产生的中间结果键/值(key/value)对暂时缓冲到内存。
map()函数缓冲到内存的中间结果将被定时刷写到本地硬盘,这些数据通过分区函数分成R个区。这些中间结果在本地硬盘的位置信息将被发送回master,然后这个master负责把这些位置信息传送给reduce()函数的worker。
当master通知了reduce()函数的worker关于中间键/值(key/value)对的位置时,worker调用远程方法从map()函数的worker机器的本地硬盘上读取缓冲的中间数据。当reduce()函数的worker读取到所有的中间数据后,它就使用这些中间数据的键(key)进行排序,这样可以使得相同键(key)的值都在一起。如果中间结果集太大了,那么就需要使用外排序。
reduce()函数的worker根据每一个中间结果的键(key)来遍历排序后的数据,并且把键(key)和相关的中间结果值(value)集合传递给reduce()函数。reduce()函数的worker最终把输出结果存放在master机器的一个输出文件中。
当所有的map任务和reduce任务都已经完成后,master激活用户程序。在这时,MapReduce返回用户程序的调用点。
当以上步骤成功结束以后,MapReduce的执行数据存放在总计R个输出文件中(每个输出文件都是由reduce任务产生的,这些文件名是用户指定的)。通常,用户不需要将这R个输出文件合并到一个文件,他们通常把这些文件作为输入传递给另一个MapReduce调用,或者用另一个分布式应用来处理这些文件,并且这些分布式应用把这些文件看成为输入文件由于分区(partition)而成为多个块文件。
2.5.2 MapReduce容错处理
由于MapReduce函数库是设计用于在成百上千台机器上处理海量数据的,所以这个函数库必须考虑到机器故障的容错处理。下面就详细介绍MapReduce的容错处理机制。
如图2.14所示,master会定期发送命令轮询每一台worker机器。如果在一定时间内有一台worker机器一直没有响应,master就认为这个worker失效了。所有这台worker完成的map任务都被设置成为它们的初始空闲状态,因此可以被其他worker调度执行。类似的,所有这个机器上正在处理的map任务或者reduce任务都被设置成为空闲状态,被其他worker重新执行。
在失效机器上的已经完成的map任务还需要再次重新执行,这是因为中间结果存放在这个失效的机器上,所以导致中间结果无法访问。已经完成的recude任务无需再次执行,因为它们的结果已经保存在全局的文件系统中了。
当map任务首先由A worker执行,随后被B worker执行的时候(因为A机器失效了),所有执行reduce任务的worker都会被通知。所有还没有来得及从A上读取数据的worker都会从B上读取数据。
大多数MapReduce函数库都能够有效地支持很多worker失效的情况。比如,在一个网络例行维护时,可能导致每次大约有80台机器在几分钟之内不能访问。master会简单地使这些不能访问的worker上的工作再执行一次,并且继续调度进程,直到所有任务都完成。
在master中,会定期设定检查点(checkpoint)。如果master任务失效了,可以从上次最后一个检查点开始启动另一个master进程。
map和reduce任务的可靠性是由输出进行原子提交来完成的。每一个正在进行的任务把输出写到一个私有的临时文件中。当全部写完之后,进行提交操作,并把这些临时文件变为永久保存的文件。
2.5.3 MapReduce实现架构
当前,针对MapReduce算法的实现架构如图2.16所示。主应用程序(Main Application)协调其他实例进行map()或者reduce()操作,然后从每个reduce()操作中收集结果。
图2.16 通用MapReduce实现架构
主应用程序负责把基础的数据集分解到“桶”(bucket)中。桶的最佳大小依赖于应用、节点的数量和可用的I/O带宽。这些“桶”通常存储在磁盘上,如果有必要也可能分散到主存中,这依赖于具体的应用。“桶”将作为map()函数的输入。
主应用程序也负责调度和分散几个MapReduce的核心备份,这几个备份是完全一致的。每个核心备份中有一个控制者,这个控制者会持续跟踪每个map()和reduce()任务的状态,并且可以作为map()和reduce()任务之间路由中间结果的管道。每个map()任务处理器完全指派给“桶”,然后产生一个保存到共享存储区域(Shared Memory)的中间结果集。共享存储可以设计成分布缓存、磁盘或其他设备等形式。当一个新的中间结果被写入共享存储区域后,任务就向控制者发出通知,并提供指向其共享存储位置的句柄。
当新的中间结果可用时,控制者分配reduce()任务。这个任务通过应用独立的中间键值(key/value)来实现排序,使相同的数据能聚集在一起,以提供更快的检索。大块的结果集可以进行外部排序,reduce()任务遍历整个排序的数据,把唯一的键和分类的结果传递到用户的reduce()函数进行处理。
经过map()和reduce()的过程,当所有的“桶”都用完时,全部的reduce()任务都会通知控制者,以说明它们的结果产生了。控制者就向主应用程序发出检索这个结果的信号。主应用程序可能直接操作这些结果,或者重新分配到不同的MapReduce控制者和任务进行进一步的处理。
当前,很多企业应用系统都是建立在Java技术上,它们依赖于已有的文件系统、通信协议和应用栈。
一个基于Java的MapReduce实现应该考虑到已存在的数据存储设备,将来部署到结构里面支持哪种协议,有哪些内部API和支持部署哪种第三方产品(开源的或商业的)。图2.17显示了通常的架构是如何通过映射到已有的、健壮的Java开源架构来实现的。
图2.17 MapReduce Java开源实现架构
这个架构采用了已有的工具,比如Terracotta和Mule,它们经常用来架构企业级应用系统。以物理或虚拟系统形式存在的“白盒子”通过简单的配置和部署,即可设计成MapReduce群组中的一部分。为了提高效率,一个庞大的系统可以被分解到多个虚拟机器上,如果需要可以分配更多的节点。
Terracotta集群技术是map和reduce任务之间共享数据的良好选择,因为它把map()和reduce()函数之间的通信过程,包括共享文件或者使用RPC调用以及初始处理结构都作了良好的封装。
从前面的描述可知,map()和reduce()任务是在同一个核心应用中实现的。用来共享中间结构集的数据结构可以保持在内存的数据结构中,通过Terracotta透明地共享交换。
由跨域集群的MapReduce产生的进程内通信问题,自从Terracotta在运行时掌管着这些共享数据结构后就不存在了。在Terracotta中,所有的map()任务都需要标记内存中的中间结果集,然后reduce()任务根据标记直接在内存中提取它们。
控制者和主应用程序都是通过Mule的ESB传递信号的。通过主流的企业应用协议或者完全的原始TCP/IP Sockets,Mule支持在内存中进行同步和异步的数据传输。Mule可用于在同一台机器执行的应用系统、跨越不同的数据中心或者完全不同的地方且被程序员分开标识的本地终端节点之间传递输出结构集。
大多数Java语言的MapReduce算法的实现都是基于Hadoop的。Hadoop是一个开源的点对点、通用的MapReduce实现。有关它的介绍,我们在下一节详细讲解。
2.5.4 Hadoop中的MapReduce简介
Hadoop是Apache下的一个分布式并行计算框架。Hadoop的核心设计思想是MapReduce和HDFS,MapReduce在前文中已经作了详细的介绍;而HDFS是Hadoop Distributed File System的缩写,即Hadoop的分布式文件系统,它也是基于我们前面介绍的Google File System的原理开发的,它为分布式计算存储提供底层支持。
Hadoop官方文档介绍了Hadoop中MapReduce的三个步骤:map(主要是分解并行的任务)、combine(主要是为了提高reduce的效率)和reduce(把处理后的结果再汇总起来)。
1. map
由于map是并行地对输入的文件集进行操作,所以它的第一步(FileSplit)就是把文件集分割成一些子集。当单个的文件大到影响查找效率时,它会被分割成一些小的文件。要指出的是,分割这一步是不知道输入文件的内部逻辑结构的。比如,以行为逻辑分割的文本文件会被以任意的字节界限分割,所以具体分割要由用户自己指定,然后每个文件分割体都会对应地有一个新的map任务。
当单个map任务开始时,它会对每个配置过的reduce任务开启一个新的输出流(writer),这个输出流会读取文件分割体。Hadoop中的类InputFormat用于分析输入文件并产生键值(key/value)对。
Hadoop中的Mapper类是一个可以由用户实现的类,经过InputFormat类分析的键值(key/value)对都传给Mapper类,这样,用户提供的Mapper类就可以进行真正的map操作。
当map操作的输出被收集后,它们会被Hadoop中的Partitioner类以指定的方式区分地写入输出文件里。
2. combine
当map操作输出它的键值(key/value)对时,出于性能和效率的考虑,Hadoop框架提供了一个合成器(combine)。有了这个合成器,map操作所产生的键值(key/value)对就不会马上被写入输出文件,它们会被收集在一些list中。一个key值对应一个list,当写入一定数量的键值(key/value)对时,这部分list会被合成器处理。
比如,Hadoop案例中的word count程序,它的map操作输出是(word,1)键值对,在map操作的输入中,词的计数可以使用合成器来加速。合成操作会在内存中收集处理list,一个词一个list。当一定数量的键值对被输出到内存中时,就调用合成操作的reduce()方法,每次都以一个唯一的词为key,values是list的迭代器,然后合成器输出(word,count-in-this-part-of-the-input)键值对。
3. reduce
当一个reduce任务开始时,它的输入分散在各个节点上的map的输出文件里。如果在分布式的模式下,需要先把这些文件复制到本地文件系统上。
一旦所有的数据都被复制到reduce任务所在的机器上时,reduce任务会把这些文件合并到一个文件中,然后这个文件会被合并分类,使得相同key的键值对可以排在一起。接下来的reduce操作就很简单了。这个文件会被顺序地读入,值(value)会从输入文件里用一个迭代器传给reduce()方法,直到下一个key。
最后,输出由每个reduce任务的输出文件组成,而它们的格式可以由JobConf.setOutputFormat类指定。
2.5.5 wordCount例子的实现
还记得我们在2.5节介绍的单词统计的例子么,它是学习MapReduce最好的例子。本节我们将使用Java语言来实现这个例子,其中用到了Hadoop开源云计算包(Hadoop相关API请参见Hadoop站点)。好了,先让我们来看看代码吧。
package org.myorg; import java.io.*; import java.util.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; public class WordCount extends Configured implements Tool { public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { static enum Counters { INPUT_WORDS } private final static IntWritable one = new IntWritable(1); private Text word = new Text(); private boolean caseSensitive = true; private Set<String> patternsToSkip = new HashSet<String>(); private long numRecords = 0; private String inputFile; public void configure(JobConf job) { caseSensitive = job.getBoolean("wordcount.case.sensitive", true); inputFile = job.get("map.input.file"); if (job.getBoolean("wordcount.skip.patterns", false)) { Path[] patternsFiles = new Path[0]; try { patternsFiles = DistributedCache.getLocalCacheFiles(job); } catch (IOException ioe) { System.err.println("Caught exception while getting cached files: " + StringUtils.stringifyException(ioe)); } for (Path patternsFile : patternsFiles) { parseSkipFile(patternsFile); } } } private void parseSkipFile(Path patternsFile) { try { BufferedReader fis = new BufferedReader(new FileReader(patternsFile.toString())); String pattern = null; while ((pattern = fis.readLine()) != null) { patternsToSkip.add(pattern); } } catch (IOException ioe) { System.err.println("Caught exception while parsing the cached file '" + patternsFile + "' : " + StringUtils.stringifyException(ioe)); } } public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = (caseSensitive) ? value.toString(): value.toString().toLowerCase(); for (String pattern : patternsToSkip) { line = line.replaceAll(pattern, ""); } StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); output.collect(word, one); reporter.incrCounter(Counters.INPUT_WORDS, 1); } if ((++numRecords % 100) == 0) { reporter.setStatus("Finished processing " + numRecords + " records " + "from the input file: " + inputFile); } } } public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } } public int run(String[] args) throws Exception { JobConf conf = new JobConf(getConf(), WordCount.class); conf.setJobName("wordcount"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(Map.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); List<String> other_args = new ArrayList<String>(); for (int i=0; i < args.length; ++i) { if ("-skip".equals(args[i])) { DistributedCache.addCacheFile(new Path(args[++i]).toUri(), conf); conf.setBoolean("wordcount.skip.patterns", true); } else { other_args.add(args[i]); } } FileInputFormat.setInputPaths(conf, new Path(other_args.get(0))); FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1))); JobClient.runJob(conf); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new WordCount(), args); System.exit(res); } }
下面是WordCount的运行样例及结果。
输入样例:
$ bin/hadoop dfs -ls /usr/joe/wordcount/input/ /usr/joe/wordcount/input/file01 /usr/joe/wordcount/input/file02 $ bin/hadoop dfs -cat /usr/joe/wordcount/input/file01 Hello World, Bye World! $ bin/hadoop dfs -cat /usr/joe/wordcount/input/file02 Hello Hadoop, Goodbye to hadoop.
运行程序:
$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount /usr/joe/wordcount/input /usr/joe/wordcount/output
输出:
$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000 Bye 1 Goodbye 1 Hadoop, 1 Hello 2 World! 1 World, 1 hadoop. 1 to 1
现在通过DistributedCache插入一个模式文件,文件中保存了要被忽略的单词模式。
$ hadoop dfs -cat /user/joe/wordcount/patterns.txt \. \, \! to
再运行一次,这次使用更多的选项。
$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount -Dwordcount.case.sensitive=true /usr/joe/wordcount/input /usr/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt
应该得到这样的输出:
$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000 Bye 1 Goodbye 1 Hadoop 1 Hello 2 World 2 hadoop 1
再运行一次,这一次关闭大小写敏感性(case sensitivity)。
$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount -Dwordcount.case.sensitive=false /usr/joe/wordcount/input /usr/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt
输出:
$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000 bye 1 goodbye 1 hadoop 2 hello 2 world 2