Spark的shuffle介绍
shuffle简介:在 DAG 阶段以shuffle为界,划分 stage,
上游 stage做 map task,每个maptask将计算结果数据分成多份,每一份对应到下游stage 的每个partition中,并将其临时写到磁盘,该过程叫做shuffle write;
下游stage 做reduce task,每个reduce task通过网络拉取上游 stage中所有map task的指定分区结果数据,该过程叫做shuffle read,最后完成reduce的业务逻辑。
Shuffle版本也随着spark不断进步和优化:
从2.0开始,把 Sort Based Shuffle
和 Tungsten-Sort
全部统一到 Sort Based Shuffle
中,Hash Based Shuffle
退出历史舞台。
目前spark2.1,直接把SortBased Shuffle
的writer分为三种:BypassMergeSortShuffleWriter
,SortShuffleWriter
和 UnsafeShuffle Writer
。
-
BypassMergeSortShuffle Writer:
Hash Shuffle 中的HashShuffle Writer
实现基本一致,唯一的区别在于,map端的多个输出文件会被汇总为一个文件。所有分区的数据会合并为同一个文件,会生成一个索引文件,是为了索引到每个分区的起始地址,可以随机 access某个partition 的所有数据。 -
SortShuffleWriter
:会对分区内进行排序或者全局排序。
处理步骤:使用PartitionedAppendOnlyMap
或者PartitionedPairBuffer
在内存中进行排序,排序的K是(partitionld, hash (key))这样一个元组。如果超过内存limit,spill 到一个文件中,这个文件中元素也是有序的,首先是按照 partitionld 时排厅,如宋 partona相同,再根据hash (key)进行比较排序。如果需要输出全局有序的文件的时候,就需要对之前所有的输出文件和当前内存中的数据结构中的数据进行merge sort,实现全局排序。
最终读取的时候,从整个全局merge后的读取迭代器中读取的数据,就是按照partitionld 从小到大排序的数据,误取过在中使用丹仅州刀区力权,并且记录每个分区文件的起始写入位置,把这些位置数据写入索引文件中。 UnsafeShuffleWriter
:优化部分是在shuffle write进行序列化写入过程中,直接对二进制进行排序,减少了内存消耗,最终只是 partition 级别的排序。
但是这种需要一定条件:对单条记录、shuffle数量有限制,而且不能带有聚合函数。排序实现:利用一个LongArray存储分区 ID、pageNumber、offset in page,并对这个数组排序。每次插入一条 record 到 page 中,就把 partionld + pageNumber + offset in page,作可以迭代器 PackedRecordPointer
为一个元素插入到 LongArray中。要想反向获得 record,
定义的数据结构就是[24 bit partition number][13 bit memory page number][27 bit offset inpage]然后到根据该指针可以拿到真实的record。
Spark为什么快,Spark SQL 一定比 Hive 快吗
Spark SQL 比 Hadoop Hive 快,是有一定条件的,而且不是 Spark SQL 的引擎比 Hive 的引擎快,相反,Hive 的 HQL 引擎还比 Spark SQL 的引擎更快。其实,关键还是在于 Spark 本身快。
- 消除了冗余的 HDFS 读写: Hadoop 每次 shuffle 操作后,必须写到磁盘,而 Spark 在 shuffle 后不一定落盘,可以 persist 到内存中,以便迭代时使用。如果操作复杂,很多的 shufle 操作,那么 Hadoop 的读写 IO 时间会大大增加,也是 Hive 更慢的主要原因了。
- 消除了冗余的 MapReduce 阶段: Hadoop 的 shuffle 操作一定连着完整的 MapReduce 操作,冗余繁琐。而 Spark 基于 RDD 提供了丰富的算子操作,且 reduce 操作产生 shuffle 数据,可以缓存在内存中。
- JVM 的优化: Hadoop 每次 MapReduce 操作,启动一个 Task 便会启动一次 JVM,基于进程的操作。而 Spark 每次 MapReduce 操作是基于线程的,只在启动 Executor 是启动一次 JVM,内存的 Task 操作是在线程复用的。每次启动 JVM 的时间可能就需要几秒甚至十几秒,那么当 Task 多了,这个时间 Hadoop 不知道比 Spark 慢了多少。