南京邮电大学
毕业设计(论文)外文资料翻译
学院
教育科学与技术学院
专 业
教育技术学
学生姓名
张耀武
班级学号
B07070118
外文出处
http://portal.acm.org/citation.cfm?id=1327452.1327492&coll=GUIDE&dl=&idx=J79∂=ma&preflayout=flat
附件:
1. 外文资料翻译译文;
MapReduce:大机群上的简单数据处理
Jeffrey Dean and Sanjay Ghemawat
概要
MapReduce是一种编程模式,它与处理和产生海量数据集的实现相关。用户指定一个map函数,用来处理一个key/value值,然后再指定一个reduce函数合并所有的具有相同中间key的中间value。许多真实世界的任务可以用这种模型来表述。
用这个方式写的程序能自动的在大规模的普通机器上运行。系统运行时会关注这些细节:分区调度输入数据,在机群上的调度,处理机器失败,管理器之间需要必要的沟通。这允许没有任何分布式和并行系统经验的程序员很容易利用大型分布式系统的资源。
我们的MapReduce实施运行在可以灵活调整的由普通机器组成的机群上:一个典型的MapReduce计算处理几千台机器上的以TB计算的数据。程序员会发现系统非常的易于使用:已经开发出来的上百个MapReduce程序,每天在Google的机群上都有1000多个MapReduce程序在执行。
1.简介
在过去5年的时间里,Google的创造者们实现了上百个用于特别计算目的的程序来处理海量的原始数据,比如爬虫文档,web请求日志等,用于计算出不同的数据,比如降序索引,不同的图示展示的web文档,爬虫采集的每个主机上的页面数量摘要,给定日期内最常用的查询等。绝大部分计算都是概念上很简洁的。不过,输入的数据通常是非常巨大的,并且为了能在合理时间内执行完毕,其上的计算必须分布到上百个或者上千个计算机上去执行。如何并发计算,如何分布数据,如何处理失败等等相关问题合并在一起就会导致原本简单的计算掩埋在为了解决这些问题而引入的很复杂的代码中。
因为这种复杂度,我们设计了一种新的东西来让我们能够方便处理这样的简单计算。这些简单计算原本很简单,但是由于考虑到并发处理细节,容错细节,以及数据分布细节,负载均衡等等细节问题,而导致代码非常复杂。所以我们抽象这些公共的细节到一个库中。这种抽象是源自Lisp以及其他很多面向功能的语言的map和reduce概念。我们认识到大部分操作都和map操作相关,这些map操作都是运算在输入记录的每个逻辑”record”上,并且map操作为了产生一组中间的key/value键值对,并且接着在所有相同key的中间结果上执行reduce操作,这样就可以合并适当的数据。我们得函数模式是使用用户定义的map和reduce操作,这样可以让我们并发执行大规模的运算,并且使用重新执行的方式作为容错的优先机制。
MapReduce的主要贡献在于提供了一个简单强大的接口,通过这个接口,可以把大尺度的计算自动的并发和分布执行。使用这个接口,可以通过普通PC的巨大集群,来达到极高的性能。
第二节讲述了基本的编程模式,并且给出了一些例子。第三节讲述了一个我们的基于集群的计算环境的MapReduce的实现。第四节讲述了一些我们建议的精巧编程模式。第五节讲述了在不同任务下我们的MapReduce实现的性能比较。第六节讲述了在Google中的MapReduce应用以及尝试重写了我们产品的索引系统。第七节讲述了相关工作和未来的工作。
2 编程模式
我们的运算处理一组输入的(input)键值对(key/valuepairs),并且产生一组输出的(output)键值对。MapReduce函数库的用户用两个函数来表达这样的计算:Map和Reduce。
Map函数,是用户自定义的的函数,处理输入的键值对,并且产生一组中间的(intermediate)键值对。MapReduce函数库稽核所有相同的中间键值键I的值,并且发送给Reduce函数进行处理。
Reduce函数同样也是用户提供的,它处理中间键值I,以及这个中间键值相关的值集合。这个函数合并这些值,最后形成一个相对较小的值集合。通常一个单次Reduce执行会产生0个或者1个输出值。提供给Reduce函数的中间值是通过一个iterator来提供的。这就让我们可以处理超过内存容量的值列表。
2.1举例
我们考虑这样一个例子,在很大的文档集合中统计每一个单词出现的次数。我们写出类似如下的伪代码:
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, "1");
reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));
map函数检查每一个单词,并且对每一个单词增加1到其对应的计数器(在这个例子里就是’1’).reduce函数把特定单词的所有出现的次数进行合并。
此外,我们还要写代码来对mapreduce 规范对象进行赋值,设定输入和输出的文件名,以及设定一些参数。接着我们调用MapReduce函数,把这个对象作为参数调用过去。我们把MapReduce函数库(C++函数库)和我们的程序链接在一起。附件1有完整的这个例子的代码。
2.2类型
尽管前面的伪代码是写成了条款字符串的输入和输出,但是概念上用户写的map和reduce函数有关联的类型:
map (k1,v1) ->list(k2,v2)
reduce (k2,list(v2)) ->list(v2)
也就是,输入的键和输出的键都来自不同的域。此外,中间键和值是从同一个域作为输出键和值。
我们的C++实现上,把字符串作为用户定义函数的输入和输出,留给用户代码字符串之间的转换和适当的类型。
2.3其他例子
这里有一些简单有趣的例子,都可以简单的通过MapReduce计算模型来展示:
分布式Grep:如果map函数检查输入行,满足条件的时候,map函数就把本行输出。reduce函数就是一个直通函数,简单的把中间数据输出就可以了。
URL访问频率统计:map函数处理web页面请求和应答(URL,1)的日志。Reduce函数把所有相同的URL的值合并,并且输出一个成对的(URL,总个数)。
逆向Web-Link 图:map函数输出所有包含指向目标 URL的源网页,用(target,source)这样的结构对输出。Reduce函数局和所有关联相同target URL的source列表,并且输出一个(target,list(source))这样的结构。
主机关键向量指标(Term-Vector per Hosts):关键词向量指标简而言之就是在一个文档或者一组文档中的重点次出现的频率,用(word,frequency)表达。map函数计算每一个输入文档(主机名字是从文档的URL取出的)的关键词向量,然后输出(hostname,关键词向量(Term-Vector))。reduce函数处理所有相同host的所有文档关键词向量。去掉不常用的关键词,并且输出最终的(hostname,关键词向量)对。
逆序索引:map函数分析每一个文档,并且产生一个序列(word,documentID)组。reduce函数处理指定word的所有的序列组,并且对相关的document ID进行排序,输出一个(word,list(document ID))组。所有的输出组,组成一个简单的逆序索引。通过这种方法可以很容易保持关键词在文档库中的位置。
分布式排序:map函数从每条记录中抽取关键字,并且产生(key,record)对。reduce函数原样输出所有的关键字对。这个算法是与4.1节描述的分布式处理相关的,并且排序是在4.2节描述的。
3.实现
MapReduce的接口有可能有许多不同的实现。正确的选择取决于环境。例如,一个实现可能适合小型共享内存的机器,另一个适合大型NUMA多处理器,而另一个则适合更大的收集网络的机器。
本节介绍一个在谷歌广泛使用的计算环境的实现:用交换机连接的普通PC机的大机群[4]。
在我们的环境:
(1)机器通常是双处理器的x86处理器,运行Linux操作系统,每台机器2-4 GB内存。
(2)商品网络硬件通常是用来耸无论是100兆比特/秒或1千兆位/秒时机的水平,但大大低于整体平均平分带宽。
(3)集群由数百或上千台机器,因此,机器故障是常见的。
(4)存储用直接连到每个机器上的廉价IDE硬盘。一个从内部文件系统发展起来的分布式文件系统被用来管理存储在这些磁盘上的数据.文件系统用复制的方式在不可靠的硬件上来保证可靠性和有效性。
(5)用户提交作业到调度系统。每个作业有一组任务,每个工作被调度者映射到机群中一个可用的机器集上。
3.1 执行概览
Map操作通过把输入数据进行分区(partition)(比如分为M块),就可以分布到不同的机器上执行了。输入块的拆成多块,可以并行在不同机器上执行。Reduce操作是通过中间产生的key的分布来进行分布的,中间产生的key可以根据某种分区函数进行分布(比如hash(key) mod R),分布成为R块。分区(R)的数量和分区函数都是由用户指定的。
图1 执行概览
图1是我们实现的MapReduce操作的整体数据流。当用户程序调用MapReduce函数,就会引起如下的操作(图一中的数字标示和下表的数字标示相同)。
1. 用户程序中的MapReduce函数库首先把输入文件分成M块,每块大概16M到64M(可以通过参数决定)。接着在集群的机器上执行处理程序。
2. 这些程序拷贝中的一个是master,其他的都是由master分配任务的worker.有M 个map任务和R个reduce任务将被分配.管理者分配一个map任务或reduce任务给一个空闲的worker.
3. 一个分配了map任务的worker读取并处理相关的输入小块。他处理输入的数据,并且将分析出的key/value对传递给用户定义的map函数。map函数产生的中间结果key/value对暂时缓冲到内存。
4. 这些缓冲到内存的中间结果将被定时刷写到本地硬盘,这些数据通过分区函数分成R个区。这些中间结果在本地硬盘的位置信息将被发送回master,然后这个master负责把这些位置信息传送给reduce的worker。
5. 当master通知reduce的worker关于中间key/value对的位置时,他调用remote procedure来从map worker的本地硬盘上读取缓冲的中间数据。当reduce的worker读到了所有的中间数据,他就使用中间key进行排序,这样可以使得相同key的值都在一起。因为有许多不同key的map都对应相同的reduce任务,所以,排序是必须的。如果中间结果集太大了,那么就需要使用外排序。
6. reduce worker根据每一个唯一中间key来遍历所有的排序后的中间数据,并且把key和相关的中间结果值集合传递给用户定义的reduce函数。reduce函数的输出被添加到这个reduce分割的最终的输出文件中。
7. 当所有的map任务和reduce任务都已经完成了的时候,master激活用户程序。在这时候MapReduce返回用户程序的调用点。
当这些成功结束以后,mapreduce的执行数据存放在总计R个输出文件中(每个都是由reduce任务产生的,这些文件名是用户指定的)。通常,用户不需要合并这R个输出文件到一个文件,他们通常把这些文件作为输入传递到另一个MapReduce调用,或者用另一个分布式应用来处理这些文件,并且这些分布式应用把这些文件看成为输入文件由于分区(partition)成为的多个块文件。
3.2 Master的数据结构
master需要保存一定的数据结构。对于每一个map和reduce任务来说,都需要保存它的状态(idle,in-progress或者completed),并且识别不同的worker机器(对于非idel的任务状态)。
master是一个由map任务产生的中间区域文件位置信息到reduce任务的一个管道。因此,对于每一个完成得map任务,master保存下来这个map任务产生的R中间区域文件信息的位置和大小。对于这个位置和大小信息是当接收到map任务完成得时候做的。这些信息是增量推送到处于in-progress状态的reduce任务的worker上的。
3.3 容错考虑
由于MapReduce函数库是设计用于在成百上千台机器上处理海量数据的,所以这个函数库必须考虑到机器故障的容错处理。
Worker失效的考虑
master会定期ping每一个worker机器。如果在一定时间内没有worker机器的返回,master就认为这个worker失效了。所有这台worker完成的map任务都被设置成为他们的初始空闲状态,并且因此可以被其他worker所调度执行。类似的,所有这个机器上正在处理的map 任务或者reduce任务都被设置成为idle状态,可以被其他worker所重新执行。
在失效机器上的已经完成的map任务还需要再次重新执行,这是因为中间结果存放在这个失效的机器上,所以导致中间结果无法访问。已经完成的recude任务无需再次执行,因为他们的结果已经保存在全局的文件系统中了。
当map任务首先由Aworker执行,随后被Bworker执行的时候(因为A失效了),所有执行reduce任务的worker都会被通知。所有还没有来得及从A上读取数据的worker都会从B上读取数据。
MapReduce可以有效地支持到很大尺度的worker失效的情况。比如,在一个MapReduce操作中,在一个网络例行维护中,可能会导致每次大约有80台机器在几分钟之内不能访问。MapReduce的master制式简单的把这些不能访问的worker上的工作再执行一次,并且继续调度进程,最后完成MapReduce的操作。
Master失效
在master中,定期会设定checkpoint,写出master的数据结构。如果master任务失效了,可以从上次最后一个checkpoint开始启动另一个master进程。不过,由于只有一个master在运行,所以他如果失效就比较麻烦,因此我们当前的实现上,是如果master失效了,就终止MapReduce执行。客户端可以检测这种失效并且如果需要就重新尝试MapReduce操作。
失效的处理设计
当用户提供的map和reduce函数对于他们的输入来说是确定性的函数,我们的分布式的输出就应当和整个程序没有错误的执行时一样的。
我们依靠对map和reduce任务的输出进行原子提交来完成这样的可靠性。每一个in-progress任务把输出写道一个私有的临时文件中。reduce任务产生一个这样的文件,map任务产生R个这样的任务(每一个对应一个reduce任务)。当一个map任务完成的时候,worker发送一个消息给master,并且这个消息中包含了这个R临时文件的名字。如果master又收到一个已经完成的map任务的完成消息,他就忽略这个消息。否则,他就在master数据结构中记录这个R文件。
当一个reduce任务完成的时候,reduce worker自动把临时输出的文件名改为正式的输出文件。如果再多台机器上有相同的reduce任务执行,那么就会有多个针对最终输出文件的更名动作。我们依靠文件系统提供的原子操作’改名字’,来保证最终的文件系统状态中记录的是其中一个reduce任务的输出。
我们的绝大部分map和reduce操作都是确定性的,实际上在语义角度,这个map和reduce并发执行和顺序执行是一样的,这就使得程序员很容易推测程序行为。当map和reduce操作是非确定性的时候,我们有稍弱的但是依旧是有道理的错误处理机制。对于非确定性操作来说,特定reduce任务R1的输出,与,非确定性的顺序执行的程序对R1的输出是等价的。另外,另一个reduce任务R2的输出,是和另一个顺序执行的非确定性程序对应的R2输出相关的。
考虑map任务M和reduce任务R1,R2。我们设定e(Ri)为已经提交的Ri执行(有且仅有一个这样的执行)。当e(R1)处理得是M的一次执行,而e(R2)是处理M的另一次执行的时候,那么就会导致稍弱的失效处理了。
3.4 存储位置
在我们的环境下,网络带宽资源是相对缺乏的。我们用尽量让输入数据保存在构成集群机器的本地硬盘上(通过GFS管理[8])的方式来减少网络带宽的开销。GFS把文件分成64M一块,并且每一块都有几个拷贝(通常是3个拷贝),分布到不同的机器上。MapReduce的master有输入文件组的位置信息,并且尝试分派map任务在对应包含了相关输入数据块的设备上执行。如果不能分配map任务到对应其输入数据的机器上执行,他就尝试分配map任务到尽量靠近这个任务的输入数据库的机器上执行(比如,分配到一个和包含输入数据块在一个switch网段的worker机器上执行)。当在一个足够大的cluster集群上运行大型MapReduce操作的时候,大部分输入数据都是在本地机器读取的,他们消耗比较少的网络带宽。
3.5 任务粒度
如上边我们讲的,我们把map阶段拆分到M小块,并且reduce阶段拆分成R小块执行。在理想状态下,M和R应当比worker机器数量要多得多。每一个worker机器都通过执行大量的任务来提高动态的负载均衡能力,并且能够加快故障恢复的速度:这个失效机器上执行的大量map任务都可以分布到所有其他worker机器上执行。
但是我们的实现中,实际上对于M和R的取值有一定的限制,因为master必须执行O(M+R)次调度,并且在内存中保存O(M*R)个状态。(对影响内存使用的因素还是比较小的:O(M*R)块状态,大概每对map任务/reduce任务1个字节就可以了)
进一步来说,用户通常会指定R的值,因为每一个reduce任务最终都是一个独立的输出文件。在实际中,我们倾向于调整M的值,使得每一个独立任务都是处理大约16M到64M的输入数据(这样,上面描写的本地优化策略会最有效),另外,我们使R比较小,这样使得R占用不多的worker机器。我们通常会用这样的比例来执行MapReduce: M=200,000,R=5,000,使用2,000台worker机器。
3.6 备用任务
通常情况下,一个MapReduce的总执行时间会受到最后的几个”拖后腿”的任务影响:在计算过程中,会有一个机器过了比正常执行时间长得多的时间还没有执行完map或者reduce任务,导致MapReduce总任务不能按时完成。出现拖后腿的情况有很多原因。比如:一个机器的硬盘有点问题,经常需要反复读取纠错,然后把读取输入数据的性能从30M/s降低到1M/s。cluster调度系统已经在某台机器上调度了其他的任务,所以因为CPU/内存/本地硬盘/网络带宽等竞争的关系,导致执行MapReduce的代码性能比较慢。我们最近出现的一个问题是机器的启动代码有问题,导致关闭了cpu的cache:在这些机器上的任务性能有上百倍的影响。
我们有一个通用的机制来减少拖后腿的情况。当MapReduce操作接近完成的时候,master调度备用进程来执行那些剩下的in-progress状态的任务。无论当最初的任务还是backup任务执行完成的时候,都把这个任务标记成为已经完成。我们调优了这个机制,通常只会占用多几个百分点的机器资源。但是我们发现这样做以后对于减少超大MapReduce操作的总处理时间来说非常有效。例如,在5.3节描述的排序任务,在关闭掉备用任务的情况下,要比有备用任务的情况下多花44%的时间。
4 技巧
虽然简单写map和reduce函数实现基本功能就已经对大部分需求都足够了,我们还是开发了一些有用的扩展,这些在本节详细描述。
4.1 分割函数
MapReduce用户指定reduce任务和reduce任务需要的输出文件的数量。他们处理的数据在这些任务上通过对中间结果key的分区函数来进行分区。缺省的分区函数时使用hash函数(例如hash(key)mod R)。这一般就可以得到分散均匀的分区。不过,在某些情况下,对key用其他的函数进行分区可能更有用。比如,某些情况下key是URL,那么我们希望所有对单个host的入口URL都保存在相同的输出文件。为了支持类似的情况,MapReduce函数库可以让用户提供一个特定的分区函数。比如使用hash(hostname(urlkey))mod R作为分区函数,这样可以让指向同一个hostname的URL分配到相同的输出文件中。
4.2 顺序保证
我们确保在给定的分区中,中间键值对key/value的处理顺序是根据key增量处理的。这样的顺序保证可以很容易生成每一个分区的有序的输出文件,这对于输出文件格式需要支持客户端的对key的随机存取的时候就很有用,或者对输出数据集再作排序就很容易。
4.3 combiner函数
在某些情况下,允许中间结果key重复会占据相当的比重,并且用户定义的reduce函数满足结合律和交换律。比如2.1节的一个统计单词出现次数的例子。由于word的频率趋势符合Zipf 分布(齐夫分布),每一个map任务都回产生成百上千的 这样格式的记录。所有这些记录都通过网络发送给一个单个的reduce任务,通过reduce函数进行相加,最后产生单个数字。我们允许用户指定一个可选的组合函数Combiner函数,先在本地进行合并以下,然后再通过网络发送。
Combiner函数在每一个map任务的机器上执行。通常这个combiner函数的代码和reduce的代码实现上都是一样的。reduce函数和combiner函数唯一的不同就是MapReduce对于这两个函数的输出处理上不同。对于reduce函数的输出是直接写到最终的输出文件。对于combiner函数来说,输出是写到中间文件,并且会被发送到reduce任务中去。
部分使用combiner函数可以显著提高某些类型的MapReduce操作。附录A有这样的使用combiner的例子。
4.4 输入和输出类型
MapReduce函数库提供了读取几种不同格式的输入的支持。例如,”text”模式下,每行输入都被看成一个key/value对:key是在文件的偏移量,value是行的内容。另一个宠用格式保存了根据key进行排序key/value对的顺序。每一个输入类型的实现都知道如何把输入为了分别得map任务而进行有效分隔(比如,text模式下的分隔就是要确保分隔的边界只能按照行来进行分隔)。用户可以通过简单的提供reader接口来进行新的输入类型的支持。不过大部分用户都只用一小部分预先定义的输入类型。
reader函数不需要提供从文件读取数据。例如,我们很容易定义一个reader函数从数据库读取数据,或者从保存在内存中的数据结构中读取数据。
类似的,我们提供了一组用于输出的类型,可以产生不同格式的数据,并且用户也可以很简单的增加新的输出类型。
4.5 边界效应
在某些情况下,针对MapReduce的使用上,如果在map操作或者reduce操作时增加辅助的输出文件,会比较有用。我们依靠程序来提供这样的边界原子操作。通常应用程序写一个临时文件并且用系统的原子操作:改名字操作,来再这个文件写完的时候,一次把这个文件改名改掉。
对于单个任务产生的多个输出文件来说,我们没有提供其上的两阶段提交的原子操作支持。因此,对于产生多个输出文件的,对于跨文件有一致性要求的任务,都必须是确定性的任务。这个限制到现在为止还没有真正在实际中遇到过。
4.6 跳过损坏的记录
某些情况下,用户程序的代码会让map或者reduce函数在处理某些记录的时候crash掉。这种情况下MapReduce操作就不能完成。一般的做法是改掉bug然后再执行,但是有时候这种先改掉bug的方式不太可行;也许是因为bug是在第三方的lib里边,它的原代码不存在等等。并且,很多时候,忽略一些记录不处理也是可以接受的,比如,在一个大数据集上进行统计分析的时候,就可以忽略有问题的少量记录。我们提供了一种执行模式,在这种执行模式下,MapReduce会检测到哪些记录会导致确定的crash,并且跳过这些记录不处理,使得整个处理能继续进行。
每一个worker处理进程都有一个signal handler,可以捕获内存段异常和总线错误。在执行用户map或者reduce操作之前,MapReduce函数库通过全局变量保存记录序号。如果用户代码产生了这个信号,signal handler于是用”最后一口气”通过UDP包向master发送上次处理的最后一条记录的序号。当master看到在这个特定记录上,有不止一个失效的时候,他就标志着条记录需要被跳过,,并且在下次重新执行相关的Map或者Reduce任务的时候跳过这条记录。
4.7 本地执行
因为实际执行操作是分布在系统中执行的,通常是在好几千台计算机上执行得,并且是由master机器进行动态调度的任务,所以对map和reduce函数的调试就比较麻烦。为了能够简化调试、程序分析和小规模测试,我们开发了一套MapReduce的本地实现,也就是说,MapReduce函数库在本地机器上顺序执行所有的MapReduce操作。用户可以控制执行,这样计算可以限制到特定的map任务上。用户可以通过设定特别的标志来执行他们的程序,同时也可以很容易的使用调试和测试工具(比如gdb)等等。
4.8 状态信息
master内部有一个HTTP服务器,并且可以输出状态报告。状态页提供了计算的进度报告,比如有多少任务已经完成,有多少任务正在处理,输入的字节数,中间数据的字节数,输出的字节数,处理百分比,等等。这些页面也包括了指向每个任务输出的标准错误和输出的标准文件的连接。用户可以根据这些数据来预测计算需要大约执行多长时间,是否需要为这个计算增加额外的计算资源。这些页面也可以用来分析为何计算执行的会比预期的慢。
此外,最上层的状态页面也显示了哪些worker失效了,以及他们失效的时候上面运行的map和reduce任务。这些信息对于调试用户代码中的bug很有帮助。
4.9 计数器
MapReduce函数库提供了用于统计不同事件发生次数的计数器。比如,用户可能想统计所有已经索引的German文档数量或者已经处理了多少单词的数量,等等。
为了使用这样的特性,用户代码创建一个叫做counter的对象,并且在map和reduce函数中在适当的时候增加counter的值。例如:
Counter* uppercase;
uppercase = GetCounter("uppercase");
map(String name, String contents):
for each word w in contents:
if (IsCapitalized(w)):
uppercase->Increment();
EmitIntermediate(w, "1");
这些counter的值,会定时从各个单独的worker机器上传递给master(通过ping的应答包传递)。master把执行成功的map或者reduce任务的counter值进行累计,并且当MapReduce操作完成之后,返回给用户代码。当前counter值也会显示在master的状态页面,这样人可以看到计算现场的进度。当累计counter的值的时候,master会检查是否有对同一个map或者reduce任务的相同累计,避免累计重复。(backup任务或者机器失效导致的重新执行map任务或者reduce任务或导致这个counter重复执行,所以需要检查,避免master进行重复统计)。
部分计数器的值是由MapReduce函数库进行自动维持的,比如已经处理的输入的key/value对的数量,或者输出的key/value键值对等等。
counter特性对于MapReduce操作的完整性检查非常有用。比如,在某些MapReduce操作中,用户程序需要确保输出的键值对精确的等于处理的输入键值对,或者处理得German文档数量是在处理的整个文档数量中属于合理范围内。
5 性能
在本节,我们用在一个大型集群上运行的两个计算来衡量MapReduce的性能。一个计算用来在一个大概1TB的数据中查找特定的匹配串。另一个计算排序大概1TB的数据。
这两个程序代表了大量的用MapReduce实现的真实的程序的主要类型-一类是对数据进行洗牌,另一类是从海量数据集中抽取少部分的关心的数据。
5.1 机器配置
所有这些程序都是运行在一个大约有1800台机器的集群上。每台机器配置2个2G Intel Xeon支持超线程的处理器, 4GB内存,两个160GB IDE硬盘,一个千兆网卡。这些机器部署在一个由两层的,树形交换网络中,在最上层大概有100-200G的聚合带宽。所有这些机器都有相同的部署(对等部署),因此任意两点之间的来回时间小于1毫秒。
在4GB内存里,大概有1-1.5G用于运行在集群上的其他任务。这个程序是在周末下午执行的,这时候的CPU,磁盘和网络基本上属于空闲状态。
5.2 GREP
Grep程序需要扫描大概10的10次方个由100个字节组成的记录,查找比较少见的3个字符的查找串(这个查找串在92337个记录中存在)。输入的记录被拆分成大约64M一个的块(M=15000),整个输出方在一个文件中(R=1)。
图2 超时数据传递速率
图2表示了这个程序随时间的处理过程。Y轴是输入数据的处理速度。处理速度逐渐随着参与MapReduce计算的机器增加而增加,当1764台worker开始工作的时候,达到了30G/s的速度。当map任务结束的时候,在计算开始后80秒,输入的速度降到0。整个计算过程从开始到结束一共花了大概150秒。这包括了大约一分钟的开头启动部分。开头的部分是用来把这个程序传播到各个worker机器上的时间,并且等待GFS系统打开100个输入文件集合并且获得相关的文件位置优化信息。
5.3 SORT排序
SORT程序排序10的10次方个100个字节组成的记录(大概1TB的数据)。这个程序是仿制TeraSort benchmark[10]的。
sort程序是由不到50行用户代码组成。三行的map函数从文本行中解出10个字节的排序key,并且把这个key和原始行作为中间结果key/value键值对输出。我们使用了一个内嵌的identitiy函数作为reduce的操作。这个函数把中间结果key/value键值对不变的作为输出的key/value键值对。最终排序输出写到一个两路复制的GFS文件中(就是说,程序的输出会写2TB的数据)。
就像前边讲的,输入数据分成64MB每块(M=15000)。我们把排序后的输出分区成为4000个文件(R=4000)。分区函数使用key的原始字节来把数据分区到R个小块中。
我们这个benchmark中的分区函数自身知道key的分区情况。通常对于排序程序来说,我们会增加一个预处理的MapReduce操作,这个操作用于采样key的情况,并且用这个采样的key的分布情况来计算对最终排序处理得分区点。
图3 不同程序执行方式的超时数据传输速率
图三是这个排序程序的正常执行过程。左上的图表示了输入数据读取的速度。数据读取速度会达到13G/s,并且在不到200秒所有map任务完成之后迅速滑落到0。我们注意到数据读取速度小于grep粒子。这是因为排序map任务划了大概一半时间和I/O带宽写入中间输出到本地硬盘。相对应的grep中间结果输出几乎可以忽略不计。
左边中间的图是map任务把中间数据发送到reduce任务的网络速度。这个排序过程自从第一个任务完成之后就开始了。图示上的第一个高峰是启动了第一批大概1700个reduce任务(整个MapReduce分布到大概1700台机器上,每台机器一次大概执行1个reduce任务)。大概计算开始300秒以后,这些第一批reduce任务完成了,并且我们开始执行剩下的reduce任务。所有这些排序任务会在计算开始后大概600秒结束。
左下的图表示reduce任务把排序后的数据写到最终的输出文件的速度。在第一个排序期结束后到写盘开始之前有一个小延时,这是因为机器正在忙于内部排序中间数据。写盘速度持续大概2-4G/s。在计算开始后大概850秒左右写盘完成。包括启动部分,整个计算用了891秒。这个和TeraSort benchmark[18]的最高纪录1057秒差不多。
需要注意的事情是:输入速度要比排序速度和输出速度快,这是因为我们本地化的优化策略,绝大部分数据都是从本地硬盘读取而上去了我们相关的网络消耗。排序速度比输出速度快,这是因为输出阶段写了两份排序后的速度(我们写两份的原因是为了可靠性可可用性的原因)。我们写两份的原因是因为底层文件系统的可靠性和可用性的要求。如果底层文件系统用类似容错编码[14](erasure coding)的方式,而不采用复制写的方式,在写盘阶段可以降低网络带宽的要求。
5.4 备份任务的影响
在图三(b),是我们在关闭掉backup任务的时候,sort程序的执行情况。执行流和上边讲述的图3(a)很类似,但是这个关闭掉backup任务的时候,执行的尾巴很长,并且执行的尾巴没有什么有效的写盘动作。在960秒以后,除了5个reduce以外,其他reduce任务都已经完成。不过这些拖后腿的任务又执行了300秒才完成。整个计算化了1283秒,多了44%的执行时间。
5.5 机器失效
在图3(c)中,显示我们有意的在排序计算过程中停止1746个worker中的200个worker的执行情况。底层的集群调度立刻在这些机器上重新创建了新的worker处理(因为我们只是把这些机器上的处理进程杀掉,而机器依旧是可以操作的)。
因为已经完成的map work丢失了(由于相关的map worker被杀掉了),需要重新再作,所以worker死掉会导致一个负数的输入速率。相关map任务的重新执行很快就重新执行了。整个计算过程在933秒内完成,包括了前边的启动时间(只比正常执行时间多了5%的时间)。
6 经验
我们在2003年1月写了第一个版本的MapReduce函数库,并且在2003年8月作了显著的增强,包括了本地优化,worker机器之间的动态负载均衡等等。自那以后,MapReduce函数库就广泛用于我们日常处理的问题。它现在在Google内部各个领域内广泛应用,包括:
l 大规模的机器学习问题。
l Google News和Froogle产品的集群问题。
l 从公众查询产品(比如Google的Zeitgeist)的报告中抽取数据。
l 从web网页作新试验和抽取新的产品(例如,从大量的webpage中的本地查找抽取物理位置信息)。
l 大规模的图型计算。
图4 MapReduce情况下的超时
表1:MapReduce2004年8月的执行情况
任务数
平均任务完成时间
使用的机器时间
29423
634秒
79,186天
读取的输入数据
产生的中间数据
写出的输出数据
3,288TB
758TB
193TB
每个job平均worker机器数
每个job平均死掉work数
每个job平均map任务
每个job平均reduce任务
157
1.2
3,351
55
map唯一实现
reduce的唯一实现
map/reduce的combiner实现
395
296
426
图四显示了我们的源代码管理系统中,随着时间推移,MapReduce程序的显著增加。从2003年早先时候的0个增长到2004年9月份的差不多900个不同的程序。MapReduce之所以这样成功是因为他能够在不到半小时时间内写出一个简单的能够应用于上千台机器的大规模并发程序,并且极大的提高了开发和原形设计的周期效率。并且,他可以让一个完全没有分布式和/或并行系统经验的程序员,能够很容易的开发处理海量数据的程序。
在每一个任务结束的时候,MapReduce函数库记录使用的计算资源的状态。在表1,我们列出了2004年8月份MapReduce运行的任务所占用的相关资源。
6.1 大规模的索引
到目前为止,最成功的MapReduce的应用就是重写了Google web 搜索服务所使用到的索引系统。索引系统处理蠕虫系统抓回来的超大量的数据,这些数据保存在GFS文件里。普通这些文档的大小是超过了20TB的数据。索引程序是通过一系列的,大概5到10次MapReduce操作来建立索引。通过利用 MapReduce(替换掉上一个版本的特别设计的分布处理的索引程序版本)有这样一些好处:
l 索引代码很简单,很小,很容易理解。因为对于容错的处理代码,分布以及并行处理代码都通过MapReduce函数库封装了,所以索引代码很简单,很小,很容易理解。例如,当使用MapReduce函数库的时候,计算的代码行数从原来的3800行C++代码一下减少到大概700行代码。
l MapReduce的函数库的性能已经非常好,所以我们可以把概念上不相关的计算步骤分开处理,而不是混在一起以期减少处理次数。这使得我们容易改变索引处理方式。比如,我们对老索引系统的一个小更改可能要好几个月的时间,但是在新系统内,只需要花几天时间就可以了。
l 索引系统的操作更容易了,这是因为机器的失效,速度慢的机器,以及网络风暴都已经由MapReduce自己解决了,而不需要操作人员的交互。此外,我们可以简单的通过对索引系统增加机器的方式提高处理性能。
7 相关工作
很多系统都提供了严格的编程模式,并且通过对编程的严格限制来实现自动的并行计算。例如,一个结合函数可以在N个元素的所有前缀上进行计算,并且使用并发前缀计算,会在在N个并发节点上会耗费log N的时间[6,9,13]。MapReduce是基于我们的大型现实计算的经验,对这些模型的一个简化和精炼.并且,我们还提供了基于上千台处理器的容错实现。并且,我们还提供了基于上千台处理器的容错实现。而大部分并发处理系统都只在小规模的尺度上实现,并且机器的容错还是程序员来操心的。
Bulk Synchronous Programming[17]以及一些MPI primitives[11]提供了更高级别的抽象,可以更容易写出并行处理的程序。这些系统和MapReduce系统的不同之处在于,MapReduce是通过限制性编程模式自动实现用户程序的并发处理,并且提供了透明的容错处理。
我们本地的优化策略是受active disks[12,15]等技术的影响的,在active disks中,计算任务是尽量推送到数据在本地磁盘的节点处理,这样就减少了网络系统的I/O吞吐。我们是在直接附带几个硬盘的通机器上执行我们的计算工作,不是在磁盘处理器上执行我们的工作,但是总的效果是一样的。
我们的backup task机制和早先CharlotteSystem[3]的机制比较类似。早先的简单调度的一个缺点是如果一个任务导致反复失效,那么整个计算就不能完成。我们通过在故障情况下跳过故障记录的方式,在某种程度上解决了这个问题。
MapReduce的实现依赖于一个内部的集群管理系统,这个集群管理系统负责在一个超大共享机器组上分布和运行用户任务。虽然这个不是本论文的重点,集群管理系统在理念上和Condor[16]等其他系统一样。
MapReduce函数库的排序部分和NOW-Sort[1]的操作上很类似。源机器(map workers)把待排序的数据进行分区,并且发送到R个reduce worker中的一个进行处理。每一个reduce worker作本地排序(尽可能在内存排序)。当然NOW-Sort没有刻意用户定义的Map和Reduce函数,而我们的函数库有,所以我们的函数库可以有很高的适应性。
River[2]提供了一个编程模式,在这样的编程模式下,处理进程可以通过分布式查询来互相传送数据的方式进行通讯。和MapReduce类似,River系统尝试提供对不同应用有近似平均的性能,即使在不对等的硬件环境下或者在系统颠簸的情况下也能提供近似平均的性能。River是通过精心调度硬盘和网络的通讯,来平衡任务的完成时间。MapReduce的框架是通过限制性编程模式,来把问题分解成为大量的任务。每一个任务都是动态调度到可用的worker上执行,这样快速的worker可以执行更多的任务。限制性编程模式同样允许我们在接近计算完成的时候调度backup 任务,在出现处理不均匀的情况下,大量的缩小整个完成的时间(比如在有慢机或者阻塞的worker的时候)。
BAD-FS[5]和MapReduce的编程模式完全不同,它不像MapReduce是基于很大的网络计算的。不过,这两个系统有两个基本原理很类似。(1)两个系统都使用重复执行来防止由于失效导致的数据丢失。(2)两个都使用数据本地化调度策略,使得处理尽可能在本地数据上进行,减少通过网络通讯的数据量。
TACC[7]是一个用于简单构造高可用性网络服务的系统。就像MapReduce,它依靠重新执行机制来实现的容错处理。
8 结束语
MapReduce的编程模式在Google成功应用于许多方面。我们把这种成功应用的原因归结为几个方面:首先,这个编程模式易于使用,即使程序员没有并行或者分布式系统经验,由于MapReduce封装了并行的细节和容错处理,本地化计算,负载均衡等等,所以,使得编程非常容易。其次,大量不同的问题都可以简单通过MapReduce来解决。例如,MapReduce用于产生Google的web搜索服务所需要的数据,用来排序,用来数据挖掘,用于机器智能学习,以及很多其他系统。第三,我们已经在一个好几千台计算机的大型集群上开发实现了这个MapReduce。这个实现使得对于这些机器资源的利用非常简单,并且因此也适用于解决Google遇到的其他很多需要大量计算的问题。
我们也从MapReduce上学到了不少内容。首先,先执行编程模式使得并行和分布式计算非常容易,并且也易于构造这样的容错计算环境。其次,网络带宽是系统的资源的瓶颈。我们系统的一系列优化都使因此针对减少网络传输量为目的的:本地优化使得我们读取数据时,是从本地磁盘读取的,并且写出单个中间数据文件到本地磁盘也节约了网络带宽。第三,冗余执行可以减少慢机器带来的影响,并且解决由于机器失效导致的数据丢失问题。
9 感谢
Josh Levenberg校定和扩展了用户级别的MapReduce API,并且结合他的适用经验和其他人的改进建议,增加了很多新的功能。MapReduce使用Google文件系统GFS[8]来作为数据和输出。我们还感谢Percy Liang Olcan Sercinoglu 在开发用于MapReduce的集群管理系统得工作。Mike Burrows,Wilson Hsieh,Josh Levenberg,Sharon Perl,RobPike,Debby Wallach 为本论文提出了宝贵的意见。OSDI的无名审阅者,以及我们的审核者Eric Brewer,在论文应当如何改进方面给出了有益的意见。最后,我们感谢Google的工程部的所有MapReduce的用户,感谢他们提供了有用的反馈,以及建议,以及错误报告等等。
10 参考资料
[1] Andrea C. Arpaci-Dusseau, Remzi H. Arpaci-Dusseau,David E. Culler, Joseph M. Hellerstein, and David A. Patterson.High-performance sorting on networks of workstations.In Proceedings of the 1997 ACM SIGMOD InternationalConference on Management of Data, Tucson,Arizona, May 1997.
[2] Remzi H. Arpaci-Dusseau, Eric Anderson, NoahTreuhaft, David E. Culler, Joseph M. Hellerstein, David Patterson, and Kathy Yelick. Cluster I/O with River:Making the fast case common. In Proceedings of the Sixth Workshop on Input/Output in Parallel and Distributed Systems (IOPADS '99), pages 10.22, Atlanta, Georgia, May 1999.
[3] Arash Baratloo, Mehmet Karaul, Zvi Kedem, and Peter Wyckoff. Charlotte: Metacomputing on the web. In Proceedings of the 9th International Conference on Parallel and Distributed Computing Systems, 1996. [4] Luiz A. Barroso, Jeffrey Dean, and Urs H¨olzle. Web search for a planet: The Google cluster architecture. IEEE Micro, 23(2):22.28, April 2003.
[5] John Bent, Douglas Thain, Andrea C.Arpaci-Dusseau, Remzi H. Arpaci-Dusseau, and Miron Livny. Explicit control in a batch-aware distributed file system. In Proceedings of the 1st USENIX Symposium on Networked Systems Design and Implementation NSDI, March 2004.
[6] Guy E. Blelloch. Scans as primitive parallel operations.IEEE Transactions on Computers, C-38(11), November 1989.
[7] Armando Fox, Steven D. Gribble, Yatin Chawathe, Eric A. Brewer, and Paul Gauthier. Cluster-based scalable network services. In Proceedings of the 16th ACM Symposium on Operating System Principles, pages 78. 91, Saint-Malo, France, 1997.
[8] Sanjay Ghemawat, Howard Gobioff, and Shun-Tak Leung. The Google file system. In 19th Symposium on Operating Systems Principles, pages 29.43, Lake George, New York, 2003. To appear in OSDI 2004 12
[9] S. Gorlatch. Systematic efficient parallelization of scan and other list homomorphisms. In L. Bouge, P. Fraigniaud, A. Mignotte, and Y. Robert, editors, Euro-Par'96. Parallel Processing, Lecture Notes in Computer Science 1124, pages 401.408. Springer-Verlag, 1996.
[10] Jim Gray. Sort benchmark home page. http://research.microsoft.com/barc/SortBenchmark/.
[11] William Gropp, Ewing Lusk, and Anthony Skjellum. Using MPI: Portable Parallel Programming with the Message-Passing Interface. MIT Press, Cambridge, MA, 1999.
[12] L. Huston, R. Sukthankar, R.Wickremesinghe, M. Satyanarayanan, G. R. Ganger, E. Riedel, and A. Ailamaki. Diamond: A storage architecture for early discard in interactive search. In Proceedings of the 2004 USENIX File and Storage Technologies FAST Conference, April 2004.
[13] Richard E. Ladner and Michael J. Fischer. Parallel prefix computation. Journal of the ACM, 27(4):831.838, 1980. [14] Michael O. Rabin. Efficient dispersal of information for security, load balancing and fault tolerance. Journal of the ACM, 36(2):335.348, 1989.
[15] Erik Riedel, Christos Faloutsos, Garth A. Gibson, and David Nagle. Active disks for large-scale data processing. IEEE Computer, pages 68.74, June 2001.
[16] Douglas Thain, Todd Tannenbaum, and Miron Livny. Distributed computing in practice: The Condor experience. Concurrency and Computation: Practice and Experience, 2004.
[17] L. G. Valiant. A bridging model for parallel computation. Communications of the ACM, 33(8):103.111, 1997.
[18] Jim Wyllie. Spsort: How to sort a terabyte quickly. http://alme1.almaden.ibm.com/cs/spsort.pdf.
A 单词频率统计
本节包含了一个完整的程序,用于统计在一组命令行指定的输入文件中,每一个不同的单词出现频率。
#include "mapreduce/mapreduce.h"
// User's map function
class WordCounter : public Mapper {
public:
virtual void Map(const MapInput& input) {
const string& text = input.value();
const int n = text.size();
for (int i = 0; i < n; ) {
// Skip past leading whitespace
while ((i < n) && isspace(text[i]))
i++;
// Find word end
int start = i;
while ((i < n) && !isspace(text[i]))
i++;
if (start < i)
Emit(text.substr(start,i-start),"1");
}
}
};
REGISTER_MAPPER(WordCounter);
// User's reduce function
class Adder : public Reducer {
virtual void Reduce(ReduceInput* input) {
// Iterate over all entries with the
// same key and add the values
int64 value = 0;
while (!input->done()) {
value += StringToInt(input->value());
input->NextValue();
}
// Emit sum for input->key()
Emit(IntToString(value));
}
};
REGISTER_REDUCER(Adder);
int main(int argc, char** argv) {
ParseCommandLineFlags(argc, argv);
MapReduceSpecification spec;
// Store list of input files into "spec"
for (int i = 1; i < argc; i++) {
MapReduceInput* input = spec.add_input();
input->set_format("text");
input->set_filepattern(argv[i]);
input->set_mapper_class("WordCounter");
}
// Specify the output files:
// /gfs/test/freq-00000-of-00100
// /gfs/test/freq-00001-of-00100
// ...
MapReduceOutput* out = spec.output();
out->set_filebase("/gfs/test/freq");
out->set_num_tasks(100);
out->set_format("text");
out->set_reducer_class("Adder");
// Optional: do partial sums within map
// tasks to save network bandwidth
out->set_combiner_class("Adder");
// Tuning parameters: use at most 2000
// machines and 100 MB of memory per task
spec.set_machines(2000);
spec.set_map_megabytes(100);
spec.set_reduce_megabytes(100);
// Now run it
MapReduceResult result;
if (!MapReduce(spec, &result)) abort();
// Done: 'result' structure contains info
// about counters, time taken, number of
// machines used, etc.
return 0;
}
2.外文资料翻译译文;
MapReduce: Simplied Data Processing on Large Clusters
Jeffrey Dean and Sanjay Ghemawat
jeff@google.com, sanjay@google.com
Google, Inc.
Abstract
MapReduce is a programming model and an associatedimplementation for processing and generatinglargedata sets. Users specify a map function that processes akey/value pair to generate a set of intermediate key/valuepairs, and a reduce function that merges all intermediatevalues associated with the same intermediate key. Manyreal world tasks are expressible in this model, as shownin the paper.
Programs written in this functional style are automaticallyparallelized and executed on a large cluster of commoditymachines. The run-time system takes care of thedetails of partitioning the input data,scheduling the program'sexecution across a set of machines, handling machinefailures, andmanaging the required intermachinecommunication.
This allows programmers without anyexperience with parallel and distributed systems to easilyutilize the resources of a large distributed system.
Our implementation of MapReduce runs on a largecluster of commodity machines and is highlyscalable:a typical MapReduce computation processes many terabytesof data on thousands of machines.
Programmer and the system easy to use: hundreds of MapReduce programshave been implementedand upwards of one thousand
MapReduce jobs are executed on Google's clustersevery day.
1 Introduction
Over the past five years, the authors and many others atGoogle have implemented hundreds of special-purposecomputations that process large amounts of raw data,such as crawled documents, web request logs, etc., tocompute various kinds of derived data, such as invertedindices, various representations of the graph structureof web documents, summaries of the number of pagescrawled per host, the set of most frequent queries in agiven day, etc. Most such computations are conceptuallystraightforward. However, the input data is usuallylarge and the computations have to be distributed acrosshundreds or thousands of machines in order to finisha reasonable amount of time. The issues of how to parallelizethe computation, distribute the data, and handlefailures conspire to obscure the original simple computationwith large amounts of complex code to deal withthese issues.
As a reaction to this complexity, we designed a newabstraction that allows us to express the simple computationswe were trying to perform but hides the messy detailsof parallelization, fault-tolerance, data distributionand load balancing in a library. Our abstractions is inspired by the map and reduceprimitives present in Lispand many other functional languages. We realized thatmost of ourcomputations involved applying a map operationto each logical .record. in our input in order tocompute a set of intermediate key/value pairs, and thenapplying a reduce operation to all the values that sharedthe same key, in order to combine the derived data appropriately.Our use of a functional model with userspecified map and reduce operations allows us to parallelize
large computations easily and to use re-executionas the primary mechanism for fault tolerance.
The major contributions of this work are a simple andpowerful interface that enables automatic parallelizationand distribution of large-scale computations, combinedwith an implementation of this interface that achieveshigh performance on large clusters of commodity PCs.
Section 2 describes the basic programming model andgives several examples. Section 3 describes an implementationof the MapReduce interface tailored towardsour cluster-based computing environment.Section 4 describesseveral refinements of the programming modelthat we have found useful. Section 5 has performancemeasurements of our implementation for a variety oftasks. Section 6 explores the use of MapReduce withinGoogle including our experiences in using it as the basis
To appear in OSDI 2004 1for a rewrite of our production indexing system. Section7 discusses related and future work.
2 Programming Model
The computation takes a set of input key/value pairs, andproduces a set of output key/value pairs. The user ofthe MapReduce library expresses the computation as twofunctions: Map and Reduce.
Map, written by the user, takes an input pair and producesa set of intermediate key/value pairs. The MapReducelibrary groups together all intermediate values associatedwith the same intermediate key I and passes themto the Reduce function.
The Reduce function, also written by the user, acceptsan intermediate key I and a set of values for that key. Itmerges together these values to form a possibly smallerset of values. Typically just zero or one output value isproduced per Reduce invocation. The intermediate valuesare supplied to the user's reduce function via an iterator.
This allows us to handle lists of values that are toolarge to fit in memory.
2.1 Example
Consider the problem of counting the number of occurrencesof each word in a large collection of documents.The user would write code similar to the followingpseudo-code:map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, "1");
reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));
The map function emits each word plus an associatedcount of occurrences (just `1' in this simple example).The reduce function sums together all counts emittedfor a particular word.
In addition, the user writes code to fill in a mapreducespecification object with the names of the input and output_les, and optional tuning parameters. The user theninvokes the MapReduce function, passing it the specification object. The user's code is linked together with theMapReduce library (implemented in C++). Appendix Acontains the full program text for this example.
2.2 Types
Even though the previous pseudo-code is written in termsof string inputs and outputs, conceptually the map andreduce functions supplied by the user have associated types:
map (k1,v1) ! list(k2,v2)
reduce (k2,list(v2)) ! list(v2)
I.e., the input keys and values are drawn from a differentdomain than the output keys and values. Furthermore,the intermediate keys and values are from the same domain
as the output keys and values.
Our C++ implementation passes strings to and fromthe user-defined functions and leaves it to the user codeto convert between strings and appropriate types.
2.3 More Examples
Here are a few simple examples of interesting programsthat can be easily expressed as MapReduce computations.
Distributed Grep: The map function emits a line if itmatches a supplied pattern. The reduce function is anidentity function that just copies the supplied intermediatedata to the output.
Count of URL Access Frequency: The map functionprocesses logs of web page requests and out-puts . The reduce function adds together all valuesfor the same URL and emits a hURL;< total count>pair.
ReverseWeb-Link Graph: The map function outputs pairs for each link to a targetURL found in a page named source. The reducefunction concatenates the list of all source URLs associatedwith a given target URL and emits the pair:
Term-Vector per Host: A term vector summarizes themost important words that occur in a document or a set of documents as a list of pairs. The map function emits a pair for each input document (where the hostname isextracted from the URL of the document). The reducefunction is passed all per-document term vectorsfor a given host. It adds these term vectors together,throwing away infrequent terms, and then emits a final pair.
Inverted Index: The map function parses each document,and emits a sequence of hword; document IDIpairs. The reduce function accepts all pairs for a givenword, sorts the corresponding document IDs and emits ahword; list(document ID)i pair. The set of all outputpairs forms a simple inverted index. It is easy to augmentthis computation to keep track of word positions.
Distributed Sort: The map function extracts the keyfrom each record, and emits a hkey; recordi pair. Thereduce function emits all pairs unchanged. This computationdepends on the partitioning facilities described inSection 4.1 and the ordering properties described in Section
4.2.
3 Implementation
Many different implementations of the MapReduce interfaceare possible. The right choice depends on theenvironment. For example, one implementation may besuitable for a small shared-memory machine, another fora large NUMA multi-processor, and yet another for aneven larger collection of networked machines.
This section describes an implementation targetedto the computing environment in wide use at Google:large clusters of commodity PCs connected together withswitched Ethernet [4]. In our environment:
(1) Machines are typically dual-processor x86 processorsrunning Linux, with 2-4 GB of memory per machine.
(2) Commodity networking hardware is used. typicallyeither 100 megabits/second or 1 gigabit/second at themachine level, but averaging considerably less in over-allbisection bandwidth.
(3) A cluster consists of hundreds or thousands of machines,and therefore machine failures are common.
(4) Storage is provided by inexpensive IDE disks attacheddirectly to individual machines. A distributed _lesystem [8] developed in-house is used to manage the datastored on these disks. The _le system uses replication toprovide availability and reliability on top of unreliablehardware.
(5) Users submit jobs to a scheduling system. Each jobconsists of a set of tasks, and is mapped by the schedulerto a set of available machines within a cluster.
3.1 Execution Overview
The Map invocations are distributed across multiplemachines by automatically partitioning the input data into a set of M splits. The input splits can be processedin parallel by different machines. Reduce invocationsare distributed by partitioning the intermediate keyspace into R pieces using a partitioning function (e.g.,hash(key) mod R). The number of partitions (R) andthe partitioning function are specified by the user.
Figure 1 shows the overallow of a MapReduce operationin our implementation. When the user programcalls the MapReduce function, the following sequenceof actions occurs (the numbered labels in Figure 1 correspondto the numbers in the list below):
1. The MapReduce library in the user program firstsplits the input _les into M pieces of typically 16megabytes to 64 megabytes (MB) per piece (controllableby the user via an optional parameter). Itthen starts up many copies of the program on a clusterof machines.
2. One of the copies of the program is special . themaster. The rest are workers that are assigned work by the master. There areM map tasks and R reducetasks to assign. The master picks idle workers and assigns each one a map task or a reduce task.
3. A worker who is assigned a map task reads thecontents of the corresponding input split. It parseskey/value pairs out of the input data and passes eachpair to the user-defined Map function. Theintermediate key/value pairs produced by the Map functionare buffered in memory.
4. Periodically, the buffered pairs are written to localdisk, partitioned into R regions by the partitioning function. The locations of these buffered pairs onthe local disk are passed back to the master, who is responsible for forwarding these locations to thereduce workers.
5. When a reduce worker is notified by the masterabout these locations, it uses remote procedure calls
to read the buffered data from the local disks of themap workers. When a reduce worker has read all intermediatedata, it sorts it by the intermediate keysso that all occurrences of the same key are grouped
together. The sorting is needed because typicallymany different keys map to the same reduce task. If
the amount of intermediate data is too large to fit inmemory, an external sort is used.
6. The reduce worker iterates over the sorted intermediatedata and for each unique intermediate key encountered,it passes the key and the correspondingset of intermediate values to the user's Reduce function.
The output of the Reduce function is appendedto a final output file for thisreduce partition.
7. When all map tasks and reduce tasks have beencompleted, the master wakes up the user program.
At this point, the MapReduce call in the user programreturns back to the user code.
After successful completion, the output of the mapreduceexecution is available in the R output _les (one perreduce task, with _le names as specified by the user).
Typically, users do not need to combine these R output_les into one _le . they often pass these _les asinput toanother MapReduce call, or use them from another distributedapplication that is able to deal with input that is partitioned into multiple _les.
3.2 Master Data Structures
The master keeps several data structures. For each maptask and reduce task, it stores the state (idle, in-progress,or completed), and the identity of the worker machine
(for non-idle tasks).
The master is the conduit through which the locationof intermediate _le regions is propagated from map tasksto reduce tasks. Therefore, for each completed map task,the master stores the locations and sizes of the R intermediatele regions produced by the map task. Updatesto this location and size information are received as maptasks are completed. The information is pushed incrementally to workers that have in-progress reduce tasks.
3.3 Fault Tolerance
Since the MapReduce library is designed to help processvery large amounts of data using hundreds or thousandsof machines, the library must tolerate machine failures gracefully.
Worker Failure
The master pings every worker periodically. If no responseis received from a worker in a certain amount oftime, the master marks the worker as failed. Any maptasks completed by the worker are reset back to their initialidle state, and therefore become eligible for schedulingon other workers. Similarly, any map task or reducetask in progress on a failed worker is also reset to idle and becomes eligible for rescheduling.
Completed map tasks are re-executed on a failure because their output is stored on the local disk(s) of thefailed machine and is therefore inaccessible. Completedreduce tasks do not need to be re-executed since theiroutput is stored in a global file system.
When a map task is executed first by worker A andthen later executed by worker B (because A failed), all workers executing reduce tasks are noticed of the reexecution.
Any reduce task that has not already read thedata from worker A will read the data from worker B.
MapReduce is resilient to large-scale worker failures.
For example, during one MapReduce operation, networkmaintenance on a running cluster was causing groups of 80 machines at a time to become unreachable for several minutes. The MapReduce master simply re-executed the work done by the unreachable worker machines, and continued to make forward progress, eventually completing the MapReduce operation.
Master Failure
It is easy to make the master write periodic checkpointsof the master data structures described above. If the master task dies, a new copy can be started from the last checkpointed state. However, given that there is only a single master, its failure is unlikely; therefore our current implementation aborts the MapReduce computation if the master fails. Clients can check for this condition and retry the MapReduce operation if they desire.
Semantics in the Presence of Failures
When the user-supplied map and reduce operators are deterministic functions of their input values, our distributed implementation produces the same output as would have been produced by a non-faulting sequential execution of the entire program.
We rely on atomic commits of map and reduce task outputs to achieve this property. Each in-progress task writes its output to private temporary files. A reduce task produces one such _le, and a map task produces R suchles (one per reduce task). When a map task completes, the worker sends a message to the master and includes the names of the R temporary _les in the message. If the master receives a completion message for an already completed map task, it ignores the message. Otherwise, it records the names of R files in a master data structure.
When a reduce task completes, the reduce workeratomically renames its temporary output _le to the _nal output _le. If the same reduce task is executed on multiple machines, multiple rename calls will be executed for the same final output _le. We rely on the atomic rename operation provided by the underlying _le system to guarantee that the final _le system state contains just the data produced by one execution of the reduce task.
The vast majority of our map and reduce operators are deterministic, and the fact that our semantics are equivalent to a sequential execution in this case makes it very easy for programmers to reason about their program's behavior.
When the map and/or reduce operators are nondeterministic,we provide weaker but still reasonable semantics. In the presence of non-deterministic operators, the output of a particular reduce task R1 is equivalent to the output for R1 produced by a sequential execution of the non-deterministic program. However, the output for a different reduce task R2 may correspond to the output for R2 produced by a different sequential execution of the non-deterministic program.
Consider map task M and reduce tasks R1 and R2.
Let e(Ri) be the execution of Ri that committed (there is exactly one such execution). The weaker semantics arise because e(R1) may have read the output produced by one execution of M and e(R2) may have read the output produced by a different execution ofM.
3.4 Locality
Network bandwidth is a relatively scarce resource in our computing environment. We conserve network bandwidth by taking advantage of the fact that the input data (managed by GFS [8]) is stored on the local disks of the machines that make up our cluster. GFS divides each _le into 64 MB blocks, and stores several copies of each block (typically 3 copies) on different machines. The
MapReduce master takes the location information of the input files into account and attempts to schedule a map task on a machine that contains a replica of the corresponding input data. Failing that, it attempts to schedule a map task near a replica of that task's input data (e.g., on a worker machine that is on the same network switch as the machine containing the data). When running large
MapReduce operations on a significant fraction of the workers in a cluster, most input data is read locally and consumes no network bandwidth.
3.5 Task Granularity
We subdivide the map phase into M pieces and the reduce phase into R pieces, as described above. Ideally,Mand R should be much larger than the number of worker machines. Having each worker perform many different tasks improves dynamic load balancing, and also speeds up recovery when a worker fails: the many map tasks it has completed can be spread out across all the other worker machines.
There are practical bounds on how largeM and R can be in our implementation, since the master must make
O(M + R) scheduling decisions and keeps O(M _ R) state in memory as described above. (The constant factors for memory usage are small however: the O(M_R) piece of the state consists of approximately one byte of data per map task/reduce task pair.)
To appear in OSDI 2004 5
Furthermore, R is often constrained by users because the output of each reduce task ends up in a separate outputle. In practice, we tend to choose M so that each individual task is roughly 16 MB to 64 MB of input data (so that the locality optimization described above is most effective), and we make R a small multiple of the number of worker machines we expect to use. We often perform
MapReduce computations with M = 200; 000 and R = 5; 000, using 2,000 worker machines.
3.6 Backup Tasks
One of the common causes that lengthens the total time taken for a MapReduce operation is a .straggler.: a machine that takes an unusually long time to complete one of the last few map or reduce tasks in the computation.
Stragglers can arise for a whole host of reasons. For example, a machine with a bad disk may experience frequent correctable errors that slow its read performance from 30 MB/s to 1 MB/s. The cluster scheduling system may have scheduled other tasks on the machine, causing it to execute the MapReduce code more slowly due to competition for CPU, memory, local disk, or network bandwidth. A recent problem we experienced was a bug in machine initialization code that caused processor caches to be disabled: computations on affected machines slowed down by over a factor of one hundred.
We have a general mechanism to alleviate the problem of stragglers. When a MapReduce operation is close to completion, the master schedules backup executions of the remaining in-progress tasks. The task is marked as completed whenever either the primary or the backup execution completes. We have tuned this mechanism so that it typically increases the computational resources used by the operation by no more than a few percent. We have found that this significantly reduces the time to complete large MapReduce operations. As an example, the sort program described in Section 5.3 takes 44% longer to complete when the backup task mechanism is disabled.
4 Refinements
Although the basic functionality provided by simplywriting Map and Reduce functions is sufficient for mostneeds, we have found a few extensions useful. These aredescribed in this section.
4.1 Partitioning Function
The users of MapReduce specify the number of reducetasks/output _les that they desire (R). Data gets partitionedacross these tasks using a partitioning function onthe intermediate key. A default partitioning function isprovided that uses hashing (e.g. .hash(key) mod R.).This tends to result in fairly well-balanced partitions. Insome cases, however, it is useful to partition data bysome other function of the key. For example, sometimesthe output keys are URLs, and we want all entries for asingle host to end up in the same output file. To supportsituations like this, the user of the MapReduce librarycan provide a special partitioning function. Forexample,using .hash(Hostname(urlkey)) mod R. as the partitioningfunction causes all URLs fromthe same host to end up in the same output file.
4.2 Ordering Guarantees
We guarantee that within a given partition, the intermediate key/value pairs are processed in increasing key order.
This ordering guarantee makes it easy to generate a sorted output file per partition, which is useful when the output _le format needs to support efficient random access lookups by key, or users of the output find it convenient to have the data sorted.
4.3 Combiner Function
In some cases, there is significant repetition in the intermediate keys produced by each map task, and the usersspecified Reduce function is commutative and associative.A good example of this is the word counting example in Section 2.1. Since word frequencies tend to follow a Zipf distribution, each map task will produce hundreds or thousands of records of the form . All of these counts will be sent over the network to a single reduce task and then added together by the Reduce function to produce one number. We allow the user to specify an optional Combiner function that does partial merging of this data before it is sent over the network.
The Combiner function is executed on each machine that performs a map task. Typically the same code is used to implement both the combiner and the reduce functions.
The only difference between a reduce function and a combiner function is how the MapReduce library handles the output of the function. The output of a reduce function is written to the final output file. The output of a combiner function is written to an intermediate file that will be sent to a reduce task.
Partial combining significantly speeds up certain classes of MapReduce operations. Appendix A contains an example that uses a combiner.
4.4 Input and Output Types
The MapReduce library provides support for reading input data in several different formats. For example, .text.
To appear in OSDI 2004 6mode input treats each line as a key/value pair: the keyis the offset in the file and the value is the contents ofthe line. Another common supported format stores a
sequence of key/value pairs sorted by key. Each inputtype implementation knows how to split itself into meaningfulranges for processing as separate map tasks (e.g.text mode's range splitting ensures that range splits occuronly at line boundaries). Users can add support for anew input type by providing an implementation of a simplereader interface, though most users just use one of asmall number of predefined input types.
A reader does not necessarily need to provide dataread from a file. For example, it is easy to define a readerthat reads records from a database, or from data structuresmapped in memory.
In a similar fashion, we support a set of output typesfor producing data in different formats and it is easy foruser code to add support for new output types.
4.5 Side-effects
In some cases, users of MapReduce have found it convenientto produce auxiliary _les as additional outputsfrom their map and/or reduce operators. We rely on theapplication writer to make such side-effects atomic andidempotent. Typically the application writes to a temporary file and atomically renames this file once it has beenfully generated.
We do not provide support for atomic two-phase commitsof multiple output _les produced by a single task.
Therefore, tasks that produce multiple output _les withcross-_le consistency requirements should be deterministic.
This restriction has never been an issue in practice.
4.6 Skipping Bad Records
Sometimes there are bugs in user code that cause the Mapor Reduce functions to crash deterministically on certainrecords. Such bugs prevent a MapReduce operation fromcompleting. The usual course of action is to fix the bug,but sometimes this is not feasible; perhaps the bug is ina third-party library for which source code is unavailable.
Also, sometimes it is acceptable to ignore a fewrecords, for example when doing statistical analysis ona large data set. We provide an optional mode of executionwhere the MapReduce library detects which recordscause deterministiccrashes and skips these records in orderto make forward progress.
Each worker process installs a signal handler thatcatches segmentation violations and bus errors. Beforeinvoking a user Map or Reduce operation, the MapReducelibrary stores the sequence number of the argumentin a global variable. If the user code generates a signal,the signal handler sends a .last gasp. UDP packet thatcontains the sequence number to the MapReduce master.
When the master has seen more than one failure ona particular record, it indicates that the record should beskipped when it issues the next re-execution of the corresponding
Map or Reduce task.
4.7 Local Execution
Debugging problems in Map or Reduce functions can betricky, since the actual computation happens in a distributedsystem, often on several thousand machines,with work assignment decisions made dynamically bythe master. To help facilitate debugging, profiling, andsmall-scale testing, we have developed an alternative implementationof the MapReduce library that sequentiallyexecutes all of the work for a MapReduce operation onthe local machine. Controlsare provided to the user sothat the computation can be limited to particular maptasks. Users invoke their program with a special _ag andcan then easily use any debugging or testing tools they find useful (e.g. gdb).
4.8 Status Information
The master runs an internal HTTP server and exportsa set of status pages for human consumption. The statuspages show the progress of the computation, such ashow many tasks have been completed, how many are inprogress, bytes of input, bytes of intermediate data, bytesof output, processing rates, etc. The pages also containlinks to the standard error and standard output files generatedby each task. The user can use this data to predicthow long the computation will take, and whether ornot more resources should be added to the computation.These pages can also be used to figure out when the computationis much slower than expected.
In addition, the top-level status page shows whichworkers have failed, and which map and reduce tasks they were processing when they failed. This informationis useful when attempting to diagnose bugs in theuser code.
4.9 Counters
The MapReduce library provides a counter facility tocount occurrences of various events. For example, usercode may want to count total number of words processedor the number of German documents indexed, etc.
To use this facility, user code creates a named counterobject and then increments the counter appropriately inthe Map and/or Reduce function. For example:
Counter* uppercase;
uppercase = GetCounter("uppercase");
map(String name, String contents):
for each word w in contents:
if (IsCapitalized(w)):
uppercase->Increment();
EmitIntermediate(w, "1");
The counter values from individual worker machinesare periodically propagated to the master (piggybackedon the ping response). The master aggregates the countervalues from successful map and reduce tasks and returnsthem to the user code when the MapReduce operationis completed. The current counter values are also displayedon the master status page so that a human canwatch the progress of the live computation. When aggregatingcounter values, the master eliminates the effects ofduplicate executions of the same map or reduce task toavoid double counting. (Duplicate executions can arisefrom our use of backup tasks and from re-execution oftasks due to failures.)
Some counter values are automatically maintained by the MapReduce library, such as the number of inputkey/value pairs processed and the number of output
key/value pairs produced.Users have found the counter facility useful for sanitychecking the behavior of MapReduce operations. Forexample, in some MapReduce operations, the user codemay want to ensure that the number of output pairsproduced exactly equals the number of input pairs processed,or that the fraction of German documents processedis within some tolerable fraction of the total numberof documents processed.
5 Performance
In this section we measure the performance of MapReduceon two computations running on a large cluster ofmachines. One computation searches through approximatelyone terabyte of data looking for a particular pattern.
The other computation sorts approximately one terabyteof data.
These two programs are representative of a large subsetof the real programs written by users of MapReduce .one class of programs shuffles data from one representationto another, and another class extracts a small amountof interesting data from a large data set.
5.1 Cluster Configuration
All of the programs were executed on a cluster thatconsisted of approximately 1800 machines. Each machinehad two 2GHz Intel Xeon processors with Hyper-
Threading enabled, 4GB of memory, two 160GB IDEdisks, and a gigabit Ethernet link. The machines werearranged in a two-level tree-shaped switched networkwith approximately 100-200 Gbps of aggregate bandwidthavailable at the root. All of the machines werein the same hosting facility and therefore the round-triptime between any pair of machines was less than a millisecond.
Out of the 4GB of memory, approximately 1-1.5GBwas reserved by other tasks running on the cluster. Theprograms were executed on a weekend afternoon, whenthe CPUs, disks, and network were mostly idle.
5.2 Grep
The grep program scans through 1010 100-byte records,searching for a relatively rare three-character pattern (thepattern occurs in 92,337 records). The input is split intoapproximately 64MB pieces (M = 15000), and the entireoutput is placed in one _le (R = 1).
Figure 2 shows the progress of the computation overtime. The Y-axis shows the rate at which the input data isscanned. The rate gradually picks up as more machinesare assigned to this MapReduce computation, and peaksat over 30 GB/s when 1764 workers have been assigned.
As the map tasks finish, the rate starts dropping and hitszero about 80 seconds into the computation. The entirecomputation takes approximately 150 seconds from startto finish. This includes about a minute of startup overhead.
The overhead is due to the propagation of the programto all worker machines, and delays interacting withGFS to open the set of 1000 input _les and to get theinformation needed for the locality optimization.
5.3 Sort
The sort program sorts 1010 100-byte records (approximately1 terabyte of data). This program is modeled afterthe TeraSort benchmark [10].
The sorting program consists of less than 50 lines of user code. A three-line Map function extracts a 10-byte sorting key from a text line and emits the key and the original text line as the intermediate key/value pair. Weused a built-in Identity function as the Reduce operator.
This functions passes the intermediate key/value pair unchangedas the output key/value pair. The final sortedoutput is written to a set of 2-way replicated GFS _les(i.e., 2 terabytes are written as the output of the program).
As before, the input data is split into 64MB pieces(M = 15000). We partition the sorted output into 4000_les (R = 4000). The partitioning function uses the initialbytes of the key to segregate it into one of R pieces.
Our partitioning function for this benchmark has builtinknowledge of the distribution of keys. In a general sorting program, we would add a pre-pass MapReduce operation that would collect a sample of the keys and use the distribution of the sampled keys to compute splitpoints for the final sorting pass.
Figure 3 (a) shows the progress of a normal execution of the sort program. The top-left graph shows the rateat which input is read. The rate peaks at about 13 GB/sand dies off fairly quickly since all map tasks finish before200 seconds have elapsed. Note that the input rateis less than for grep. This is because the sort map tasksspend about half their time and I/O bandwidth writing intermediateoutput to their local disks. The correspondingintermediate output for grep had negligible size.
The middle-left graph shows the rate at which datais sent over the network from the map tasks to the reducetasks. This shuffling starts as soon as the firstmap task completes. The _rst hump in the graph is forthe _rst batch of approximately 1700 reduce tasks (theentire MapReduce was assigned about 1700 machines,and each machine executes at most one reduce task at atime). Roughly 300 seconds into the computation, someof these first batch of reduce tasks finish and we startshuffling data for the remaining reduce tasks. All of theshuffling is done about 600 seconds into the computation.
The bottom-left graph shows the rate at which sorteddata is written to the final output _les by the reduce tasks.
There is a delay between the end of the first shuffling periodand the start of the writing period because the machinesare busy sorting the intermediate data. The writescontinue at a rate of about 2-4 GB/s for a while. All ofthe writes finish about 850 seconds into the computation.Including startup overhead, the entire computation takes891 seconds. This is similar to the current best reportedresult of 1057 seconds for the TeraSort benchmark [18].A few things to note: the input rate is higher than theshuffle rate and the output rate because of our localityoptimization. most data is read from a local disk andbypasses our relatively bandwidth constrained network.
The shuffle rate is higher than the output rate becausethe output phase writes two copies of the sorted data (wemake two replicas of the output for reliability and availabilityreasons). We write two replicas because that is the mechanism for reliability and availability providedby our underlying _le system. Network bandwidth requirementsfor writing data would be reduced if the underlying_le system used erasure coding [14] rather thanreplication.
5.4 Effect of Backup Tasks
In Figure 3 (b), we show an execution of the sort programwith backup tasks disabled. The execution flow issimilar to that shown in Figure 3 (a), except that there isa very long tail where hardly any write activity occurs.
After 960 seconds, all except 5 of the reduce tasks arecompleted. However these last few stragglers don't finish until 300 seconds later. The entire computation takes 1283 seconds, an increase of 44% in elapsed time.
5.5 Machine Failures
In Figure 3 (c), we show an execution of the sort program where we intentionally killed 200 out of 1746 worker processes several minutes into the computation. The underlying cluster scheduler immediately restarted new
worker processes on these machines (since only the processes were killed, the machines were still functioning properly).
The worker deaths show up as a negative input rate since some previously completed map work disappears (since the corresponding map workers were killed) and needs to be redone. The re-execution of this map work happens relatively quickly. The entire computation finishes in 933 seconds including startup overhead (just an increase of 5% over the normal execution time).
6 Experience
We wrote the first version of the MapReduce library inFebruary of 2003, and made significant enhancements toit in August of 2003, including the locality optimization,dynamic load balancing of task execution across workermachines, etc. Since that time, we have been pleasantlysurprised at how broadly applicable the MapReduce library has been for the kinds of problems we work on.
It has been used across a wide range of domains within Google, including: _ large-scale machine learning problems, _ clustering problems for the Google News and
Froogle products, _ extraction of data used to produce reports of popular
queries (e.g. Google Zeitgeist),
_ extraction of properties of web pages for new experiments
and products (e.g. extraction of geographical
locations from a large corpus of web pages for
localized search), and
_ large-scale graph computations.
Figure 4 shows the significant growth in the number of separate MapReduce programs checked into our primary source code management system over time, from 0 in early 2003 to almost 900 separate instances as of late
September 2004. MapReduce has been so successful because it makes it possible to write a simple program and run it efficiently on a thousand machines in the course of half an hour, greatly speeding up the development and prototyping cycle. Furthermore, it allows programmers who have no experience with distributed and/or parallel systems to exploit large amounts of resources easily. At the end of each job, the MapReduce library logs
statistics about the computational resources used by the job. In Table 1, we show some statistics for a subset of
MapReduce jobs run at Google in August 2004.
6.1 Large-Scale Indexing
One of our most significant uses of MapReduce to date has been a complete rewrite of the production indexing system that produces the data structures used for the
Google web search service. The indexing system takes as input a large set of documents that have been retrieved by our crawling system, stored as a set of GFS files. The raw contents for these documents are more than 20 terabytes
of data. The indexing process runs as a sequence of five to ten MapReduce operations. Using MapReduce (instead of the ad-hoc distributed passes in the prior version of the indexing system) has provided several benefits:
The indexing code is simpler, smaller, and easier to understand, because the code that deals with fault tolerance, distribution and parallelization is hidden within the MapReduce library. For example, the size of one phase of the computation dropped from approximately 3800 lines of C++ code to approximately 700 lines when expressed using MapReduce.
_ The performance of the MapReduce library is good enough that we can keep conceptually unrelated computations separate, instead of mixing them together to avoid extra passes over the data. This makes it easy to change the indexing process. For example, one change that took a few months to make in our old indexing system took only a few days to implement in the new system.
_ The indexing process has become much easier to operate, because most of the problems caused by machine failures, slow machines, and networking hiccups are dealt with automatically by the MapReduce library without operator intervention. Furthermore, it is easy to improve the performance of the indexing process by adding new machines to the indexing cluster.
7 Related Work
Many systems have provided restricted programming models and used the restrictions to parallelize the computation automatically. For example, an associative function can be computed over all prefixes of an N element array in logN time on N processors using parallel prefix computations [6, 9, 13]. MapReduce can be considered a simplification and distillation of some of these models based on our experience with large real-world computations.
More significantly, we provide a fault-tolerant implementation that scales to thousands of processors. In contrast, most of the parallel processing systems have only been implemented on smaller scales and leave the details of handling machine failures to the programmer.
Bulk Synchronous Programming [17] and some MPI primitives [11] provide higher-level abstractions that make it easier for programmers to write parallel programs.
A key difference between these systems and MapReduce is that MapReduce exploits a restricted programming model to parallelize the user program automatically and to provide transparent fault-tolerance.
Our locality optimization draws its inspiration from techniques such as active disks [12, 15], where computation is pushed into processing elements that are close to local disks, to reduce the amount of data sent across I/O subsystems or the network. We run on commodity processors to which a small number of disks are directly connected instead of running directly on disk controller processors, but the general approach is similar.
Our backup task mechanism is similar to the eager scheduling mechanism employed in the Charlotte System [3]. One of the shortcomings of simple eager scheduling is that if a given task causes repeated failures,
the entire computation fails to complete. We _x some instances of this problem with our mechanism for skipping bad records.
The MapReduce implementation relies on an in-house cluster management system that is responsible for distributing and running user tasks on a large collection of shared machines. Though not the focus of this paper, the
cluster management system is similar in spirit to other systems such as Condor [16].
The sorting facility that is a part of the MapReduce library is similar in operation to NOW-Sort [1]. Source machines (map workers) partition the data to be sorted and send it to one of R reduce workers. Each reduce
worker sorts its data locally (in memory if possible). Of course NOW-Sort does not have the user-definable Map and Reduce functions that make our library widely applicable.
River [2] provides a programming model where processes communicate with each other by sending data over distributed queues. Like MapReduce, the River system tries to provide good average case performance even in the presence of non-uniformities introduced by heterogeneous hardware or system perturbations. River achieves this by careful scheduling of disk and network transfers to achieve balanced completion times. MapReduce has a different approach. By restricting the programming model, the MapReduce framework is able to partition the problem into a large number of fine-grained tasks. These tasks are dynamically scheduled on available workers so that faster workers process more tasks. The restricted programming model also allows us to schedule redundant executions of tasks near the end of the job which greatly reduces completion time in the presence of non-uniformities (such as slow or stuck workers).
BAD-FS [5] has a very different programming model from MapReduce, and unlike MapReduce, is targeted to the execution of jobs across a wide-area network. However, there are two fundamental similarities. (1) Both systems use redundant execution to recover from data loss caused by failures. (2) Both use locality-aware scheduling to reduce the amount of data sent across congested network links.
TACC [7] is a system designed to simplify construction of highly-available networked services. Like
MapReduce, it relies on re-execution as a mechanism for implementing fault-tolerance.
8 Conclusions
The MapReduce programming model has been successfully used at Google for many different purposes. We attribute this success to several reasons. First, the model is easy to use, even for programmers without experience with parallel and distributed systems, since it hides the details of parallelization, fault-tolerance, locality optimization, and load balancing. Second, a large variety of problems are easily expressible as MapReduce computations.
For example, MapReduce is used for the generation of data for Google's production web search service, for sorting, for data mining, for machine learning, and many other systems. Third, we have developed an implementation of MapReduce that scales to large clusters of machines comprising thousands of machines. The implementation makes efficient use of these machine resources and therefore is suitable for use on many of the large computational problems encountered at Google. We have learned several things from this work. First, restricting the programming model makes it easy to parallelize and distribute computations and to make such
computations fault-tolerant. Second, network bandwidth is a scarce resource. A number of optimizations in our system are therefore targeted at reducing the amount of data sent across the network: the locality optimization allows us to read data from local disks, and writing a single copy of the intermediate data to local disk saves network bandwidth. Third, redundant execution can be used to reduce the impact of slow machines, and to handle machine failures and data loss.
Acknowledgements
Josh Levenberg has been instrumental in revising and extending the user-level MapReduce API with a number of new features based on his experience with using
MapReduce and other people's suggestions for enhancements.
MapReduce reads its input from and writes its output to the Google File System [8]. We would like to thank Mohit Aron, Howard Gobioff, Markus Gutschke,David Kramer, Shun-Tak Leung, and Josh Redstone for their work in developing GFS. We would also like to thank Percy Liang and Olcan Sercinoglu for their work in developing the cluster management system used by MapReduce. Mike Burrows, Wilson Hsieh, Josh Levenberg,Sharon Perl, Rob Pike, and Debby Wallach provided helpful comments on earlier drafts of this paper.
The anonymous OSDI reviewers, and our shepherd, Eric Brewer, provided many useful suggestions of areas where the paper could be improved. Finally, we thank all the users of MapReduce within Google's engineering organization for providing helpful feedback, suggestions, and bug reports.
References
[1] Andrea C. Arpaci-Dusseau, Remzi H. Arpaci-Dusseau,David E. Culler, Joseph M. Hellerstein, and David A. Patterson.High-performance sorting on networks of workstations.In Proceedings of the 1997 ACM SIGMOD InternationalConference on Management of Data, Tucson,Arizona, May 1997.
[2] Remzi H. Arpaci-Dusseau, Eric Anderson, NoahTreuhaft, David E. Culler, Joseph M. Hellerstein, David Patterson, and Kathy Yelick. Cluster I/O with River:Making the fast case common. In Proceedings of the Sixth Workshop on Input/Output in Parallel and Distributed Systems (IOPADS '99), pages 10.22, Atlanta, Georgia, May 1999.
[3] Arash Baratloo, Mehmet Karaul, Zvi Kedem, and Peter Wyckoff. Charlotte: Metacomputing on the web. In Proceedings of the 9th International Conference on Parallel and Distributed Computing Systems, 1996. [4] Luiz A. Barroso, Jeffrey Dean, and Urs H¨olzle. Web search for a planet: The Google cluster architecture. IEEE Micro, 23(2):22.28, April 2003.
[5] John Bent, Douglas Thain, Andrea C.Arpaci-Dusseau, Remzi H. Arpaci-Dusseau, and Miron Livny. Explicit control in a batch-aware distributed file system. In Proceedings of the 1st USENIX Symposium on Networked Systems Design and Implementation NSDI, March 2004.
[6] Guy E. Blelloch. Scans as primitive parallel operations.IEEE Transactions on Computers, C-38(11), November 1989.
[7] Armando Fox, Steven D. Gribble, Yatin Chawathe, Eric A. Brewer, and Paul Gauthier. Cluster-based scalable network services. In Proceedings of the 16th ACM Symposium on Operating System Principles, pages 78. 91, Saint-Malo, France, 1997.
[8] Sanjay Ghemawat, Howard Gobioff, and Shun-Tak Leung. The Google file system. In 19th Symposium on Operating Systems Principles, pages 29.43, Lake George, New York, 2003. To appear in OSDI 2004 12
[9] S. Gorlatch. Systematic efficient parallelization of scan and other list homomorphisms. In L. Bouge, P. Fraigniaud, A. Mignotte, and Y. Robert, editors, Euro-Par'96. Parallel Processing, Lecture Notes in Computer Science 1124, pages 401.408. Springer-Verlag, 1996.
[10] Jim Gray. Sort benchmark home page. http://research.microsoft.com/barc/SortBenchmark/.
[11] William Gropp, Ewing Lusk, and Anthony Skjellum. Using MPI: Portable Parallel Programming with the Message-Passing Interface. MIT Press, Cambridge, MA, 1999.
[12] L. Huston, R. Sukthankar, R.Wickremesinghe, M. Satyanarayanan, G. R. Ganger, E. Riedel, and A. Ailamaki. Diamond: A storage architecture for early discard in interactive search. In Proceedings of the 2004 USENIX File and Storage Technologies FAST Conference, April 2004.
[13] Richard E. Ladner and Michael J. Fischer. Parallel prefix computation. Journal of the ACM, 27(4):831.838, 1980. [14] Michael O. Rabin. Efficient dispersal of information for security, load balancing and fault tolerance. Journal of the ACM, 36(2):335.348, 1989.
[15] Erik Riedel, Christos Faloutsos, Garth A. Gibson, and David Nagle. Active disks for large-scale data processing. IEEE Computer, pages 68.74, June 2001.
[16] Douglas Thain, Todd Tannenbaum, and Miron Livny. Distributed computing in practice: The Condor experience. Concurrency and Computation: Practice and Experience, 2004.
[17] L. G. Valiant. A bridging model for parallel computation. Communications of the ACM, 33(8):103.111, 1997.
[18] Jim Wyllie. Spsort: How to sort a terabyte quickly. http://alme1.almaden.ibm.com/cs/spsort.pdf.
A Word Frequency
This section contains a program that counts the number of occurrences of each unique word in a set of input _les speci_ed on the command line.
#include "mapreduce/mapreduce.h"
// User's map function
class WordCounter : public Mapper {
public:
virtual void Map(const MapInput& input) {
const string& text = input.value();
const int n = text.size();
for (int i = 0; i < n; ) {
// Skip past leading whitespace
while ((i < n) && isspace(text[i]))
i++;
// Find word end
int start = i;
while ((i < n) && !isspace(text[i]))
i++;
if (start < i)
Emit(text.substr(start,i-start),"1");
}
}
};
REGISTER_MAPPER(WordCounter);
// User's reduce function
class Adder : public Reducer {
virtual void Reduce(ReduceInput* input) {
// Iterate over all entries with the
// same key and add the values
int64 value = 0;
while (!input->done()) {
value += StringToInt(input->value());
input->NextValue();
}
// Emit sum for input->key()
Emit(IntToString(value));
}
};
REGISTER_REDUCER(Adder);
int main(int argc, char** argv) {
ParseCommandLineFlags(argc, argv);
MapReduceSpecification spec;
// Store list of input files into "spec"
for (int i = 1; i < argc; i++) {
MapReduceInput* input = spec.add_input();
input->set_format("text");
input->set_filepattern(argv[i]);
input->set_mapper_class("WordCounter");
}
// Specify the output files:
// /gfs/test/freq-00000-of-00100
// /gfs/test/freq-00001-of-00100
// ...
MapReduceOutput* out = spec.output();
out->set_filebase("/gfs/test/freq");
out->set_num_tasks(100);
out->set_format("text");
out->set_reducer_class("Adder");
// Optional: do partial sums within map
// tasks to save network bandwidth
out->set_combiner_class("Adder");
// Tuning parameters: use at most 2000
// machines and 100 MB of memory per task
spec.set_machines(2000);
spec.set_map_megabytes(100);
spec.set_reduce_megabytes(100);
// Now run it
MapReduceResult result;
if (!MapReduce(spec, &result)) abort();
// Done: 'result' structure contains info
// about counters, time taken, number of
// machines used, etc.
return 0;
}
本文是使用 B3log Solo 从 GAE的魅力 进行同步发布的 原文地址: http://zhangyw1988.appspot.com/articles/2011/06/11/1307757437361.html
没有评论:
发表评论