MapReduce: Simplified Data Processing on Large Clusters

MapReduce: Simplified Data Processing on Large Clusters #

主要贡献:提供了一个简单而功能强大的界面,通过该界面提交的大规模计算任务能够自动分布式并行化,且仅需使用大量的商用 PC 上就能达到很高的计算性能。

编程模型 #

  • map:一个由用户编写的函数,将输入转换成一个中间 key/value 值的集合,之后 MapReduce 库将这个结果输送给 reduce 函数。
  • reduce :也是一个由用户编写的函数,接受中间值的 key I ,和各个 map 任务对应产生的 value 值的集合。reduce 会将这些值组合起来,形成一个更小的值集合。通常每个 reduce 函数只产生零或一个输出值。

类型 #

map     (k1, v1)         ->  list(k2,v2)
reduce(k2, list(v2)) ->  list(v2)
  • k1 一般为文档名称(document name)。
  • k2v2 为中间键值对结果(intermediate key/value pairs)。

MapReduce 应用实现样例 #

map reduce
分布式 Grep 程序 遍历行内容是否匹配模式 将 map 输出都拷贝在一起(恒等函数)
计算 URL 访问频率 处理 Web 访问日志,并输出 <URL, 1> 将相同中间 key (URL)的值相加,输出 <URL, total count>
反向网页链接图 输入网页链接 source -> target ,输出 <target, source> 将相同 target 的 source 放在一起组成一个数组,即输出 <target, list(source)>
每个 Host 的词条向量 输入 <word, frequency> 列表的文件,输出 <hostname, term-vector> (hostname 从文档 URL 中获取) 将相同 hostname 的词条向量相加,输出 <hostname, (total) term-vector> 结果
倒排索引 对每个文档进行解析(分词等),输出 <word, document ID> 将中间结果按照 key 对 value 进行聚合,输出 <word, list(document ID)>
分布式 Sort 程序 从记录中提取 key,输出 <key, record> 中间结果对 原封不动地输出结果

实现 #

Google 实现的 MapReduce 库的环境背景:

  1. 服务器大部分为双核 x86 Linux,内存为 2-4G。
  2. 网络使用的是商用网络硬件,一般为 100Mbps-1Gbps 。
  3. 一个集群大约有成百上千台服务器,因此机器失效是正常现象
  4. 存储使用的是较便宜的直接连接在每个服务器上的 IDE 磁盘,由 GFS 来管理数据。
  5. 用户通过调度系统来提交计算作业。每个作业(job)包括了若干个任务(task),这些任务会被调度系统分配到不同的机器中去执行。

计算任务执行流程 #

Map 调用:将用户输入(并行)拆分为 M 个数据块。 Reduce 调用:将中间结果用分割函数(如:hash(key) mod R)来分割为 R 个数据块。

MapReduce 计算执行的过程见下图。 mapreduce-execution-overview

当 MapReduce 任务成功之后,任务的输出将为 R 个文件(每个 reduce 任务输出一个文件)。通常地,用户不需要将 R 个输出文件整合为一个,一般我们直接将它们一起作为下一个 MapReduce 任务的输入(就像 UNIX pipeline 一样,管道模式),或者将其用在能够处理分块输入的程序中。

Master 的数据结构 #

存储每个 map 和 reduce 任务的信息。

  • 任务状态:分为空闲(idle)、运行中(in-progress)以及完成(completed)。
  • 非空闲的 worker 服务器的唯一标识符(ID)。

对于每个 map 任务,master 都需要存储 R 个中间文件的路径和大小。而这个信息会被增量推送至拥有状态为运行中 reduce 任务的 worker 上。

容错(Fault Tolerance) #

应对 Worker 失效 #

master 周期性地 ping 所有的 worker 节点来验活。若在一定时间内,master 收不到来自某个 worker 的响应,则认为该 worker 已经失效。所有被分配到该 worker 上的已完成和运行中的 map 任务都需要重新回到“空闲”状态(idle),再由调度器去安排其他 worker 重做 这些 map 任务;而对于 reduce 任务,只需要将处于“运行中”状态(in-progress)的 reduce 任务重新退回“空闲”状态即可(调度器同样会重新安排这些 reduce 任务重做)。

对于使用了重做的 map 任务的 reduce 任务来说,当 map 任务重做后,所有正在进行 reduce 任务的 worker 都会收到这个 map 任务被重做的信息。因此若某个 reduce 任务需要使用该 map 任务的结果,它将会重新从新 worker 上读取结果数据。

关于 worker 失效后 map 和 reduce 任务重做状态不同的原因如下:

  • 由于 reduce 任务需要直接从 map 任务的机器上读取 map 的结果,而当 worker 失效后,已完成的 map 任务结果也将无法获取。因此该 worker 上的运行过的 map 任务(即状态为 “运行中” 和 “已完成” )都必须重新安排运行。

  • 已完成的 reduce 任务已经将结果写入全局文件系统(如 GFS)中,并不是将结果缓存在本地,因此不必再次运行这些任务。

应对 master 失效 #

在以上提到的 master 数据结构,将 master 内数据状态定期进行快照存储。若 master 发生失效,则新建一个新的 master 实例,并从最近的快照检查点进行数据恢复。此时 worker 向 master 发送的请求将失败,worker 将检查当前情况,并根据自己的策略重试 MapReduce 操作。

失效发生的语义 #

(更合适的应该是:失效定义,见 https://en.wikipedia.org/wiki/Failure_semantics

用户自定义的 map 和 reduce 函数若为确定函数,则 MapReduce 运行的结果应该要与串行执行结果相同。MapReduce 依赖于对 map 和 reduce 任务的原子提交(atomic commit)来实现这点。原子性提交实现:

  • 对于 map 任务而言,当 map 任务完成后,将会产生 R 个结果文件(提供给 R 个 reduce 任务)。worker 会将这 R 个文件的位置上报给 master,并通知其已经完成该 map 任务。若 master 同意获知该任务完成,将记录下这 R 个文件的位置;而若 master 发现这个 map 任务早已被其他 worker 完成,则直接忽略这条信息。
  • 对于 reduce 任务而言,当 reduce 任务完成后,将产生一个临时结果文件,并将其重命名为最终输出文件。若 reduce 任务在多个机器上被完成,就可能发生最终输出文件被多次重命名的问题。MapReduce 中依赖于文件系统的保证的原子重命名操作(atomic rename operation),即相同的目标重命名只能发生一次。

大部分情况下,map 和 reduce 操作都是具有确定性的。这种情况下我们可以将语义等同于串行操作,这样非常易于程序员理解 MapReduce 程序的执行逻辑。

而当 map 和 reduce 操作不具有确定性时,我们仍然提供程度较弱但合理的语义。 这段没看懂

本地化(Locality) #

网络带宽是一个非常稀缺的资源。因此 MapReduce 尽所能来使得数据不需要被传输,MapReduce 的调度器会从全局文件系统(如 GFS)获取输入文件的元信息,尽可能将 map 任务安排在已经有文件部分副本的机器上直接运行(这样能够直接读取本地磁盘来获取数据,而不需要通过网络传输大量数据)。若无法直接安排在有数据块的机器上,则会寻找与其数据目标块机器在同一个交换机下的(同一个网络节点)机器来运行 map 任务,尽可能降低系统整体的网络负载。

任务分片(Task Granularity) #

map 阶段分了 M 个任务片,reduce 阶段分了 R 个任务片。 理想情况下,M 和 R 都应该远远大于 worker 机器的数量。

实践中的限制:master 必须要做出 O(M+R) 个调度任务决定,且要在内存中存储 O(M*R) 个状态信息(每个完成的 map 任务都有 R 个文件信息)。

R 一般由用户来指定,在具体实践中,我们一般设置 M 的依据是其能将输入文件的分片大小刚好在 16-64 MB 范围内(这样文件本地化的效果最优);R 一般选用一个对于 worker 机器数量的较小的乘数,见下面的例子。

M = 200000
R = 5000
#workers = 2000

M/#workers = 100
R/#workers = 2.5

备份任务(Backup Tasks) #

在现实中,经常发现“掉队者”(straggler)会导致整个 MapReduce 任务的运行用时增加。掉队者一般是需要长时间才能运行完一个或少量的 map 或 reduce 计算任务的节点。硬件上和系统上的问题经常会导致集群中出现“掉队者”,比如一个机器上的磁盘出现问题,总是需要不停地做数据错误纠正,就可能导致其读性能从 30MB/s 下降为 1MB/s。

因此我们在集群控制器上配置了“备份任务“这一策略。当控制器发现某些 map 或 reduce 任务在一个设置的时间内还没完成且整个 MapReduce 任务马上就要接近尾声,就会把这个没完成的任务再分发一份给另外的一个 worker 运行,这称为“备份任务“。当主任务和备份任务任意一个完成后,控制器就认为这个计算任务已经完成。

这个机制很有效地提高了计算集群的资源利用率,能在极大减少大规模 MapReduce 计算任务的用时。在 Google 的实践中,当“备份任务”策略被关闭后,集群需要多耗时 44% 才能完成相同计算任务。

优化 #

  1. 分区函数:用户最后需要得到 R 个结果文件,中间结果到第 r 个结果文件的映射,就是这个分区函数完成的了。默认的分区函数为一个简单的哈希函数(如 hash(key) mod R)。经过测试,这个简单的函数也能将结果分布的比较均匀。然而,在一些场景下,用户希望得到的结果里有一些特殊要求(比如在 URL 数目输出中,我们会希望同一个域名的 URL 都尽量输出在一个文件中)。这时用户就可以自定义分区函数了,如 hash(Hostname(urlkey)) mod R ,就能够满足如上要求。
  2. 顺序保证:MapReduce 将保证在同一个分区内,中间结果和最终结果的 key 都是顺序排序的。
  3. 组合(Combiner)函数:由于每个 Map 函数会产生很大的中间结果,为了缩减中间结果在网络中传输的开销,MapReduce 提供了可让用户自定义 Combiner 函数,Combiner 函数大部分情况下都应该与 Reduce 函数实现一致。它的作用是在 Map 任务完成后,在当前 worker 本地做一次部分合并结果的操作。这样能大大提高整个 MapReduce 计算任务的速度。
  4. 输入/输出类型:默认内置“text”文本类型,在文本模式下,MapReduce 认为输入中每一行都是一个 key-value 对(key 是该行的行数,value 则是该行的文本内容)。每种输入方式都能够使得 MapReduce 了解如何拆分输入数据给不同的 map 任务。若用户需要自定义输入格式,MapReduce 提供了 reader 接口,用户通过实现 reader 接口即可自定义不同的输入数据(甚至是读取来自于数据库、映射内存上的数据等)。
  5. 副作用:用户在 map/reduce 过程中会生成额外的输出,MapReduce 依赖于应用写入器(application writer)来保证这些操作的原子性和幂等性,MapReduce 本身并不提供原子的二阶段提交(atomic two-phase commits)来保证这些文件都是由同一个任务输出的。因此,用户的 map/reduce 函数的输出必须具有确定性(即相同输入能够得到相同的结果)。
  6. 跳过坏记录(bad records):用户的 MapReduce 代码可能会包含难以解决的 bug,而由于种种原因(bug 是由第三方库引入的),这个 bug 难以被修复。而在大多数场景下,少量的结果丢失是可以被接受的(比如在进行大数据量的数据分析时)。因此 MapReduce 提供了一个检测错误机制,当某些特定的输入总是会引起 map/reduce 运行崩溃,这些记录被称之为坏记录(bad records),master 会将其记录下来,让 worker 在接下来的任务中跳过这些坏记录。这样整个 MapReduce 计算任务能够得以继续运行。
  7. 本地执行:在大规模 MapReduce 计算任务中调试具体错误是非常困难的一件事情。因此本文提供了一个串行的 MapReduce 库供用户调试使用。它能够将所有的 map/reduce 任务在一台本地机器上执行完成,且用户可使用其认为有用的调试测试工具(如 gdb)直接接入调试。
  8. 状态信息:master 节点上提供了一个 HTTP 服务,用户可以通过其看到当前 MapReduce 的集群状态,计算任务运行进度以及计算任务产生的标准输入输出。这样能够帮助用户评估计算用时以及在计算过慢时推测是哪个任务拖慢了速度。
  9. 计数器:MapReduce 提供了计数器 Counter 的功能。这个计数器能够自动处理分布式环境下的计数一致性问题,也能处理相同的任务被多次执行导致计数器重复增加的问题。原理是:用户初始化一个计数器,在 map/reduce 中使用该计数器的操作。当计数器的操作被运行后,worker 会通过周期性的将计数器附带在 master 发过来的 ping 的响应中,将计数信息同步给 master 节点。

Paper: https://pdos.csail.mit.edu/6.824/papers/mapreduce.pdf

More on: https://github.com/Triple-Z/paper-reading/issues/1