1acd7808ffeaad44edcf2bb735d2573e
漫谈分布式系统(17) -- 决战 Shuffle

IO 的重灾区 Shuffle

上一篇,我们讲了大量 IO 导致了 MapReduce 性能不佳。然后引出了基于内存的分布式计算框架 Spark 和 基于内存的分布式存储中间件 Alluxio,来减少 IO。

这两个框架确实很大程度上减轻了 IO 压力,提升了性能。但上篇介绍的重点都比较粗粒度,我们可以进一步深入到执行过程中去,看看还有没有优化的可能。

上篇文章也给过这个 MapReduce 的执行过程图。很明显,大量的 IO 都集中在 shuffle 过程中。

所以今天,我们就一起了解下 MapReduce 和 Spark 这两种典型的分布式计算框架中的 shuffle 过程。

为了方便理解,下面的描述,我们在典型的 map -> reduce 流程下,尽量统一采用 Spark 的术语。

粗略地,我们可以把程序的执行过程分为三个环节。

  • map 阶段负责读取和解析数据,
  • shuffle 阶段负责把对应的数据分发给相应的 reducer
  • 而 reduce 阶段则做汇总属于自己的数据并做最终业务逻辑处理。

既然要优化 IO,就要带上文件看,并且我们重点关注 shuffle 阶段,于是可以画出如下这样的图:

整个 shuffle 阶段我们又拆分为两个步骤:

  • shuffle write 是指 mapper 把处理好的数据写到本地磁盘,一般会以 reduce 好处理的形式组织。
  • shuffle read 是指 reducer 把分散在各个 mapper 的数据读取到本地并合并(这里没有再细分 copy 等步骤)。

由于 shuffle read 在 shuffle write 之后,相对被动,上游写了多少文件、怎么写的,下游就只能相应去处理。所以我们重点关注 shuffle write。

Hash Shuffle

shuffle,顾名思义,是要把数据打散,然后分别分发给对应的下游。

所以关键就在于为每个下游独立输出数据,也就是把给每个 reducer 的数据写在一起。

很自然地,能想到每个 mapper 都为每个 reducer 生成一个文件,再把对应数据都写进去。也就是所谓 Hash Shuffle。

Spark 在 0.8 版本引入 Hash Shuffle,并在 1.2 版本之前都将其作为默认的 Shuffle 方式。

考虑到 shuffle read 是以 partition 为单位拉取数据的,其实更彻底的是每个 mapper 为每个 partition 生成一个文件,但那样文件就太多了。我们完全可以把同一个 reducer 处理的所有 partition 都写到一个文件。

既然只关注 shuffle write,我们可以像看地图一样,滚动鼠标滑轮,看看局部一台机器上的 mapper 是怎么把输出结果写到文件的。

如上图所示,在一台机器上,分配了 2 个 Executor,每个 Executor 会先后执行两个 map 任务。所有 map 都把处理结果写到本地硬盘上。

这样一来,map 阶段结束的时候,就会在本地硬盘上存在 M*R 个文件,其中 M 是 Mapper 的数量,R 是 Reducer 的数量。

整体上看,如果一个大任务需要 40K 个 map 和 20K 个 reduce 处理,那将会产生 8 亿个文件。

再到一个集群的视角,同时跑很多这样的任务,所有机器的磁盘压力和网络压力都会变得不可接受。

必须要优化。

回想下系列第 5 篇讲的,我们做分布式系统的两个目的之一,就是为了提升计算的并行度,算得更快。

但计算资源是有限的,所以并不是一下子把所有 mapper 和 reducer 都跑起来,而是有限的并发,同时跑一部分,完了再跑另一部分,直到跑完。

而如上图所示,我们的每个 Executor 虽然都会跑两个 map,但任一具体时刻,肯定是只有一个在跑。所以,完全可以让这些 map 共用文件,而下游是以分区为单位拉取和处理数据的,所以要以分区为单位共用文件。

如上图,我们可以称之为 Consolidated Hash Shuffle

由 Executor 维护一个文件池,池里为每个 reducer 都打开一个文件。每个 map 执行时,都获取这些文件,并写入数据,执行完后,再交回这些文件。然后下一个 map 启动,重复上面的动作。

这种情况下,本地硬盘上的文件数量就减少为 P*R,其中 P 是并发任务数。对单台机器而言,可以用这台机器的 CPU Core 数量 C 除以每个 map 需要的核数 T 得到,即 P = C/T。

Spark 在 0.8.1 为 Hash Shuffle 引入 consolidation 机制,使其不至于由于性能问题很快被淘汰。

Sort Shuffle

Consolidated Hash Shuffle 确实起到了一定优化效果,把最初的 M*R 中的 M 降为了计算并行度,使得文件数和 map 数不再挂钩

但是,当 reduce 数量很大时,文件数还是很多。有没有可能像 map 数那样,把文件数和 reduce 数也脱钩呢?

回想上面的图,文件数会和 reduce 有关,是因为我们为每个 reducer 都创建了一个文件。要想打破这个一对一的关系,就只能把多个 reducer 的数据都写到同一个文件。

或者做的彻底点,把所有 reduce 的数据都写到一个文件。

但是这样一来,所有数据都混在一起,就导致了 shuffle read 阶段,不能再像 hash shuffle 那样直接读到自己的数据了。

每个 shuffle read 操作都遍历所有文件肯定是不行的,太低效了。

很自然地,参考数据库的做法,创建个索引,这样就能知道想要的数据在哪里了

但 shuffle read 并不会像数据库那样以 value 作为条件来查询,而是以 partition 为单位拉取数据的,每个 reduce 都可能拉取很多 partition。所以索引最好以 partition 为单位,而不能像 hash shuffle 那样为了减少文件数以 reduce 为单位处理。

既然索引以 partition 为单位组织,那数据也必须保持一致,把同一个 partition 的数据放在一起。

所以,需要一个排序操作,以 partition 为 key,把所有数据按 partitionId 排序后写到唯一的文件中。

如下图,即所谓 Sort Shuffle。

这个时候,map 阶段输出的结果文件数就变成 2M 了,M 个数据文件和 M 个索引文件。

由于涉及排序,没法直接 append 到文件,而内存肯定会放不下,所以会需要把装不下的数据分批 sort 然后 spill 到硬盘。这样会有很多 spill 文件,最后又需要 merge 然后 sort 成一个文件。

另外,如果考虑到 reduce 阶段的处理,要把同一个 key 的数据聚合在一起。如果同一个 key 的数据到处分散,只能在内存中一直缓存以便收集齐所有数据再计算,尤其是 avg() 这类操作。即便实时聚合 (avg 也可以通过保存 (sum,count) 实时聚合),所有 key 的中间结果集也会迅速占用大量内存,很容易 OOM。

top Created with Sketch.