双倍提升双倍提升ApacheSpark排序性能排序性能
Cloudera和英特尔公司的工程师们正在通力合作,旨在使Spark shuffle阶段具有更高的可扩展性和稳定性。本文对相关方法的
设计进行了详细描述。
区别常见的Embarrassingly Parallel系统,类似MapReduce和Apache Spark(Apache Hadoop的下一代数据处理引擎)这样
的计算引擎主要区别在于对“all-to-all” 操作的支持上。和许多分布式引擎一样,MapReduce和Spark的操作通常针对的是被分
片数据集的子分片,很多操作每次只处理单个数据节点,同时这些操作所涉及到的数据往往都只存在于这个数据片内。all-to-
all操作必须将数据集看作一个整体,而每个输出结果都可以总结自不同分片上的记录。Spark的groupByKey、sortByKey,还
有reduceByKey这些shuffle功能都属于这方面常见的操作。
在这些分布式计算引擎中,shuffle指的是在一个all-to-all操作中将数据再分割和聚合的操作。显而易见,在实践生产中,我们
在Spark部署时所发现的大多性能、可扩展性及稳定性问题都是在shuffle过程中产生的。
Cloudera和英特尔的工程师们正通力合作以扩展Spark的shuffle,使得shuffle可以更加快速与稳定地处理大量的数据集。
Spark在很多方面相较MapReduce有更多优势,同时又在稳定性与可扩展性上相差无几。在此,我们从久经考验的
MapReduce shuffle部署中吸取经验,以提高排序数据输出的shuffle性能。
在本文中,我们将会逐层解析——介绍目前Spark shuffle的运作实现模式,提出修改建议,并对性能的提高方式进行分析。更
多的工作进展可以于正在进行中的SPARK-2926发现。
Spark目前的运作实现模式
一个shuffle包含两组任务:1. 产生shuffle数据的阶段;2.使用shuffle数据的阶段。鉴于历史原因,写入数据的任务被称
做“map task”,而读取数据的任务被称做“reduce tasks”,但是以上角色分配只局限于单个job的某个具体shuffle过程中。在一
个shuffle中扮演reduce的task,在另一个shuffle中可能就是map了,因为它在前者里面执行的是读取操作,而在后者中执行的
是数据写入任务,并在随后的阶段中被消费。
MapReduce和Spark的shuffle都使用到了“pull”模式。在每个map任务中,数据被写入本地磁盘,然后在reduce任务中会远程
请求读取这些数据。由于shuffle使用的是all-to-all模式,任何map任务输出的记录组都可能用于任意reduce。一个job在map时
的shuffle操作基于以下原则:所有用于同一个reduce操作的结果都会被写入到相邻的组别中,以便获取数据时更为简单。
Spark默认的shuffle实现(即hash-based shuffle)是map阶段为每个reduce任务单独打开一个文件,这种操作胜在简单,但实
际中却有一些问题,比如说实现时Spark必须维持大量的内存消耗,或者造成大量的随机磁盘I/O。此外,如果M和R分别代表
着一个shuffle操作中的map和reduce数量,则hash-based shuffle需要产生总共M*R个数量的临时文件,Shuffle consolidation
将这个数量减至C*R个(这里的C代表的是同时能够运行的map任务数量),但即便是经过这样的修改之后,在运行的reducer
数量过多时还是经常会出现“文件打开过多”的限制。