F30c5449a962543d506ca84d5843a34c
漫谈分布式系统(16) -- 搞定 worker 的性能问题

MapReduce 的性能问题

上一篇,我们提到,分布式系统中,经常存在中心化的角色,随着集群规模的扩大,这些中心化的角色会达到性能瓶颈,使得系统不能持续横向扩展下去。

然后,我们通过 Federation 等方法解决了这些中心化的 master 的性能问题。

但这不由得提醒我们,除了 master 外,slave/worker 是不是也可能出现性能问题呢?哪怕算不上瓶颈,是不是还有性能提升的空间?

再次检视下我们之前提到最多的 HDFS 和 MapReduce,从架构上看,作为 master 的 NameNode 和 ResourceManager 也能扩展后,作为 slave/worker 的 DataNode 和 NodeManager 本来就能稳定横向扩展。至少架构上没有明显的性能瓶颈了。

架构太粗粒度,再从执行过程上看看,细粒度的个体会不会有性能问题

下图是 MapReduce 的执行过程。

联想到我们在传统的 web 和数据库领域,经常排查分析后,得出的性能瓶颈大都在 IO 上。我们也重点关注下 MR 过程中的 IO 操作。

大致有这么几步涉及 IO:

  • map 阶段,读取文件数据给 map function 处理,处理结果写到一个环形缓存,达到阈值后会以分区为单位拆分、排序,再 spill to disk。这里涉及磁盘 IO。
  • map 阶段的每次 spill 操作都会产生一个文件,所以最后会通过几轮的 merge,来保证最后每个分区只有一个排序好的文件。这里涉及磁盘 IO。
  • shuffle 阶段,需要从各个 mapper 把对应数据 copy 到对应 reducer。这里涉及网络 IO。
  • 如果从 mapper 读到的数据很小,就会先放到缓存,达到阈值后再 merge 然后 spill to disk;如果从 mapper 读到的数据很大,就会直接存到磁盘。这里涉及磁盘 IO。
  • reducer 从 mapper copy 数据的同时,会有独立的线程持续对拿到的数据做 merge。最后一轮 merge 做完后,把结果交给 reduce function 处理。这里涉及磁盘 IO。

程序执行时间往往都大量消耗在这些 IO 操作上,尤其是磁盘 IO,非常拖累性能。

MapReduce 框架想了很多办法来优化性能,比如对 map input 做 merge 和 split,比如 Combiner,比如启用对 map output 的压缩等等。

无论怎么优化,仍然会有大量的 IO 操作,尤其是磁盘 IO 操作

另一方面,MapReduce 的编程模型比较简单和死板,稍微复杂点的处理逻辑,就需要多个 MR 任务顺序执行。比如,经典的 Word Count 例子可以算出每个单词出现的次数,但想要知道出现次数最多的单词,就需要再起一个 MR 任务了。

这样一来,一个业务逻辑被硬生生拆成一个个 MR 任务组成 workflow,每个 MR 任务都以前任的 output 作为 input,自己的 output 也将成为后继者的 input。这无疑又大大增加了 IO 操作,进一步拖累了性能。

于是,逐渐有了跳出 MapReduce 去解决问题的想法。

想统治计算世界的 Spark

怎么减轻磁盘 IO 对性能的拖累?放内存啊!几乎是脱口而出吧,在传统架构里随处可见的缓存,已经给了我们足够多的经验。

Apache Spark 最初正是打着「基于内存」的旗号,迅速赢得了口碑,并得到大量应用。

直至今日,Spark 仍然在官网首页挂着上面这样的性能对比图。

Spark 如今已经是一个非常有野心并逐渐兑现自己野心的框架了。Spark 想要做的是基于内存的统一计算框架,上面承接各种典型应用场景,下面对接五花八门的存储系统

当然,这篇文章的主题不是介绍 Spark,所以不会系统介绍。我们先关注性能,在性能优化方面,Spark 也下足了功夫。可以优化的地方也非常多,我们暂时只关注在架构和设计上的两个重点:

  • cache
  • pipelining

首先是 cache,非常直接,把数据缓存在内存上。

Spark 的核心是 RDD(Resilient Distributed Dataset),所有操作都围绕这个基于内存的分布式数据结构展开。

当然,数据量一大,内存肯定就放不下了,就算放得下,也不划算。所以 Spark 支持了多种所谓 StorageLevel:

  • MEMORY_ONLY,全放内存。
  • MEMORY_AND_DISK,优先放内存,放不下的放硬盘。
  • MEMORY_ONLY_SER,序列化后放内存。
  • MEMORY_AND_DISK_SER,序列化后优先放内存,其次硬盘。
  • DISK_ONLY,全放硬盘。
  • MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.,以上设置的二副本形式。
  • OFF_HEAP (experimental),放堆外内存。

其中,序列化是为了减少内存开销,记得使用 Kyro 而不是 Java 原生的序列化库,性能会更好。序列化的代价就是会有处理 CPU 资源和时间开销,但相比 IO 性能的提升就不值得一提了。

rdd.cache()

就这么简单调用一个方法,就能把整个 rdd 放入缓存。

可以看到,Spark 的 cache 是粗粒度的,并不能让 Redis 那样的通用性缓存那样,提供细粒度到具体数据结构的操作。

另一个和 Redis 之类缓存的区别,也必须提醒的是,Spark 里的 cache,并不适合随意使用,什么数据都缓存下,毕竟数据量太大,成本不划算。

所以 cache 一般在两种场景下用得比较多:

  • 交互式的数据处理。
  • 诸如机器学习和深度学习的迭代算法。

这两种场景的共性就是会多次访问同样的数据,这样为了提升性能而付出的成本就能摊平了。这也是我们决定是否 cache 数据的重要依据。

再看 pipelining。

对于复杂的业务逻辑,Spark 更加灵活的编程范式也能更好的处理。不用再像 MR 那样把一个个任务串成串。

Spark 以 shuffle 操作为间隔,把一个 app 拆分成很多 stage。在每个 stage 内,对数据的处理都以 pipeline 的形式进行。

比如 ds.map().filter().map() ,ds 里的每一行数据,都会连续做完 map、filter 和 map 处理,然后才会把数据写到内存或硬盘,而不用像 MR 那样起两个任务去跑,每个任务跑完都需要把数据写到硬盘。

所以,通过 cache 和 pipelining 这两个技术,Spark 就能很大程度上减少 IO 操作,带来性能上的大幅提升。

但是 Spark 的 cache 是 app 级别的,不能在 app 间共享,更不能给其他非 Spark 的程序使用,这无疑限制了它的威力。要想达到这个效果,只能下沉,在文件系统层去做缓存。

想当终极存储代理商的 Alluxio

于是有了 Alluxio(原名 Tachyon)。作为 AMPLab 的同门师兄弟,Alluxio 和 Spark 有着一样的野心。

Alluxio 想做的是基于内存的统一存储系统,上面承接各种分布式计算框架,下面对接五花八门的存储后端。

这里也不会做详尽的介绍,我们只关注和本文主题相关的部分。

统一的文件抽象层

既然想做桥梁和代理,首当其冲,就是要做好统一的抽象。

top Created with Sketch.