0
点赞
收藏
分享

微信扫一扫

spark剖析:spark读取parquet文件会有多少个task


文章目录

  • ​​前言​​
  • ​​对象存储问题?​​
  • ​​存储格式问题?​​
  • ​​spark读取parquet格式文件​​
  • ​​大文件切分​​
  • ​​小文件合并​​
  • ​​总结​​
  • ​​思考​​

前言

做大数据开发的肯定都知道小文件的弊端

  • 读取阶段:​​spark​​​在读取文件时会根据文件的数量以及文件的大小来切分文件生成​​task​​​。一般​​task​​​ 数是大于等于文件数的,如果都是小于​​128M​​​的文件就是等于文件数。小文件越多,​​task​​​数越多,单个​​task​​处理的数据量就少,如果并行度不够会导致查询数据过慢。
  • 写入阶段:如果你的数据是存储到类似于​​cos​​​、​​oss​​​、​​s3​​​等对象存储中,​​spark​​​ 最后移动文件的​​rename​​​阶段如果出现大量的小文件,性能低的会让你疯掉(毕竟​​rename​​​ 在这些对象存储底层分两步​​copy​​​ 和​​delete​​​)。最后发现整个​​ETL​​​ 任务的绝大部分时间用来​​rename​

所以为了优化小文件的问题,我们目前对所有的​​ETL​​任务执行结束会输出当前分区的文件数、文件大小。如下图:

spark剖析:spark读取parquet文件会有多少个task_spark


然后让开发者根据该信息在 ​​insert​​​ 之前对数据进行​​REPARTITION​​​来达到控制文件数量的目的。但是最近发现一个问题,某位同学的 ​​spark sql​​​ 任务执行完成后生成了 ​​200​​​个文件,总大小为​​3M​​​附近(如下图),但是在读取的时候 ​​spark​​​生成的 ​​task​​​数只有 ​​8​​​ 个,和我们想象中的​​200​​​个 ​​task​​ 不一致,究竟原因在哪里?

spark剖析:spark读取parquet文件会有多少个task_spark_02

spark剖析:spark读取parquet文件会有多少个task_hive_03

对象存储问题?

由于我们大数据集群是计算存储分离的架构,所有的数据都存储在对象存储上,我首先怀疑是由于我们用的对象存储的原因。所以我特意 ​​copy​​​了一份数据存储到​​HDFS​​​上(​​bi_ods_real.sucx_test1​​​表的 ​​location​​​ 在 ​​HDFS​​)

spark-sql --master yarn -e  "insert overwrite table bi_ods_real.sucx_test1  select /*+ repartition(200) */ * from bi_dw_plan.dws_order_****_order_df where dt='20210607'"

为了和对象存储表文件数一致,在这里加上了​​/*+ repartition(200) */​​​ ,如果不加只有​​8​​个文件

表的文件数量和大小如下图:

spark剖析:spark读取parquet文件会有多少个task_小文件_04

对 ​​default.sucx_test1​​ 进行查询

spark剖析:spark读取parquet文件会有多少个task_spark小文件_05


发现​​task​​​数确实有​​200​​个,难道真的是存储不同的原因?

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

~~~~~~~~~~~​​心​​​~~~~~~~~~~~~​​累​​~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

中间经历了一系列的弯路,就不再赘述。最后我通过查看他们的物理执行计划,发现了一点不同点。

  • 对于​​HDFS​​表
  • spark剖析:spark读取parquet文件会有多少个task_spark_06

  • 对于​​cos​​ 表
  • spark剖析:spark读取parquet文件会有多少个task_小文件_07

于是我查看两张表的建表语句发现
​​​HDFS​​​表 的存储格式是​​textfile​

ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
'hdfs://172.0.0.1:8080/usr/hive/warehouse/bi_ods_real.db/sucx_test1'

​cos​​​ 表存储格式为 ​​parquet​

ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
WITH SERDEPROPERTIES (
'field.delim'='\t',
'serialization.format'='\t')
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
'cosn://***/bi/bi_dw_plan/dws_order_****_order_df'

难道是存储格式的问题?

存储格式问题?

于是我又创建了一张 ​​location​​​ 在 ​​HDFS​​​,并且存储格式为 ​​parquet​​​ 的表,发现读取时并行度也是只有 ​​8​​​ 个,于是断定关于上面 ​​spark task​​​ 数的问题和 ​​hdfs、cos​​ 无关,和数据的存储格式有关。

spark读取parquet格式文件

由于本地没有 ​​hadoop​​​ 环境,为了测试 ​​spark​​​ 读取 ​​parquet​​​ 文件如何切分文件,我写了一个读取 ​​parquet​​ 文件的方法

@Test
public void test() {
//创建sparkSession
SparkSession sparkSession = SparkSession.builder()
.master("local[1]")
.getOrCreate();
//读取parquet文件
Dataset<Row> parquet = sparkSession.read().parquet("/Users/scx/part-00199-b7613270-858d-4bc8-ad89-badd66bd9c24-c000.snappy.parquet");
//结果显示
parquet.show(false);
}

通过 ​​debug​​​ 查看堆栈信息发现程序会走到 ​​FilePartition#maxSplitBytes​

maxSplitBytes(
sparkSession: SparkSession,
selectedPartitions: Seq[PartitionDirectory]): Long = {
val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
val defaultParallelism = sparkSession.sparkContext.defaultParallelism
val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
val bytesPerCore = totalBytes / defaultParallelism

Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
}

解释代码前,先了解几个配置以及变量

  • ​spark.sql.files.maxPartitionBytes​​​ 单个​​partition​​​ 能够读取的最大值,默认为​​128M​​.
  • ​spark.sql.files.openCostInBytes​​​ 为打开文件需要的成本,是一个预估值,。假设打开一个文件需要时间​​n​​​,那么在给定的时间​​n​​​ 范围内能够读取的字节数.默认该值​​4 * 1024 * 1024(4M)​
  • ​spark.default.parallelism​​​ 任务的默认并行度,如果该值没有配置,那么该变量得值和​​spark​​​ 任务的资源配置(总​​totalCoreCount​​​)有关,比如配置​​executor-cores=n executor-nums=m​​​那么该值为​​m*n​​​ ,该值最小为​​2​​​。具体可以进入​​CoarseGrainedSchedulerBackend#defaultParallelism​​ 查看

知道了上面这些值的含义,我们可以查看 ​​maxSplitBytes​​ 方法了。

  • ​defaultMaxSplitBytes​​​ 的值为​​spark.sql.files.maxPartitionBytes​​​,即​​128M​​,-
  • ​openCostInBytes​​​ 的值为​​spark.sql.files.openCostInBytes​​,即4M
  • ​defaultParallelism​​​的值由于我没有配置,所以为集群默认的资源​​8​​​(可以通过查看​​spark-defaults.conf​​计算)

## spark-defaults.conf

  • ​totalBytes​​​ 该值需要注意,其值为:​​所有文件大小+文件数*openCostInBytes​​​,在我这里,通过上面截图可以发现所有文件大小大概为​​3M​​​,文件数为​​200​​​.计算后得知该值为:​​803M​
  • ​bytesPerCore​​​ 该值为​​totalBytes​​​/​​defaultParallelism​​​ 为​​803M/8=100M​

然后进入最后一个函数

.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))

代入值后

.min(128M, Math.max(4M, 100M))=100M

所以​​FilePartition#maxSplitBytes​​​ 最终返回值为 ​​100M​

从上面我们获取到了单个文件分片的最大大小为​​100M​​​,回到函数上一层​​DataSourceScanExec#createNonBucketedReadRDD​

private def createNonBucketedReadRDD(
readFile: (PartitionedFile) => Iterator[InternalRow],
selectedPartitions: Array[PartitionDirectory],
fsRelation: HadoopFsRelation): RDD[InternalRow] = {
//打开文件的代价,同上4M
val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
//上面已经计算过大小为100M
val maxSplitBytes =
FilePartition.maxSplitBytes(fsRelation.sparkSession, selectedPartitions)
logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
s"open cost is considered as scanning $openCostInBytes bytes.")
//selectedPartitions 指的是我们hive表的partition
val splitFiles = selectedPartitions.flatMap { partition =>
//hive 表partition下的所有文件进行切分
partition.files.flatMap { file =>
// getPath() is very expensive so we only want to call it once in this block:
val filePath = file.getPath
//isSplitable一般都为true,除非你使用了某些不支持分割的压缩算法,在我这里对于parquet表并没有指定压缩算法所以为true。
val isSplitable = relation.fileFormat.isSplitable(
relation.sparkSession, relation.options, filePath)
//第一步:大文件文件切分
PartitionedFileUtil.splitFiles(
sparkSession = relation.sparkSession,
file = file,
filePath = filePath,
isSplitable = isSplitable,
maxSplitBytes = maxSplitBytes,
partitionValues = partition.values
)
}
}.sortBy(_.length)(implicitly[Ordering[Long]].reverse)

//第二步:小文件合并
val partitions =
FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes)

new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
}

上面代码主要是遍历 ​​hive​​ 表指定分区下的文件,然后对文件进行切分。

大文件切分

下面具体看下 ​​PartitionedFileUtil#splitFiles​

splitFiles(
sparkSession: SparkSession,
file: FileStatus,
filePath: Path,
isSplitable: Boolean,
maxSplitBytes: Long,
partitionValues: InternalRow): Seq[PartitionedFile] = {
//文件是否可以切分
if (isSplitable) {
//如果可以切分则对大文件进行切分,每次的步长为FilePartition#maxSplitBytes获取的100M
(0L until file.getLen by maxSplitBytes).map { offset =>
val remaining = file.getLen - offset
val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
val hosts = getBlockHosts(getBlockLocations(file), offset, size)
PartitionedFile(partitionValues, filePath.toUri.toString, offset, size, hosts)
}
} else {
Seq(getPartitionedFile(file, filePath, partitionValues))
}
}
def getPartitionedFile(
file: FileStatus,
filePath: Path,
partitionValues: InternalRow): PartitionedFile = {
val hosts = getBlockHosts(getBlockLocations(file), 0, file.getLen)
PartitionedFile(partitionValues, filePath.toUri.toString, 0, file.getLen, hosts)
}

该方法主要是对单大文件进行切分,如果某个文件文件的大小为250M,则切分成3个​​PartitionedFile​​​。分别为​​100M,101-200M,201-250M​​ 当然,如果文件不可切分,直接返回该文件。

小文件合并

​DataSourceScanExec#createNonBucketedReadRDD​​​ 函数里不只做了大文件切分,还为小文件做了合并,真可谓是对 ​​parquet​​​ 等文件优化到了极致(为啥 ​​text​​​ 格式没这种待遇呢?🤔️)
对于小文件合并就是在​​​FilePartition.getFilePartitions​​ 函数里面做的

getFilePartitions(
sparkSession: SparkSession,
partitionedFiles: Seq[PartitionedFile],
maxSplitBytes: Long): Seq[FilePartition] = {
val partitions = new ArrayBuffer[FilePartition]
val currentFiles = new ArrayBuffer[PartitionedFile]
var currentSize = 0L

/** Close the current partition and move to the next. */
def closePartition(): Unit = {
if (currentFiles.nonEmpty) {
// Copy to a new Array.
//主要是将多个小文件合并成一个大的partition来供task操作
val newPartition = FilePartition(partitions.size, currentFiles.toArray)
partitions += newPartition
}
currentFiles.clear()
currentSize = 0
}
// 同样是4M
val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
// Assign files to partitions using "Next Fit Decreasing"
//在这里的操作主要是对于小于maxSplitBytes大小文件进行合并,对每个文件合并的时候会加上打开文件的代价
partitionedFiles.foreach { file =>
if (currentSize + file.length > maxSplitBytes) {
closePartition()
}
// Add the given file to the current partition.
currentSize += file.length + openCostInBytes
currentFiles += file
}
closePartition()
partitions
}

总结

其实看到这里,我们就能知道为什么我们总文件数为 ​​200​​​,总文件大小为​​3M​​​ 的 ​​parquet​​ 表生成的task数为8个了。为了加深大家的印象,在这里计算下。

首先​​maxSplitBytes​​​ 的大小为 ​​100M​​​,而我们的 ​​200​​​ 个文件总大小才为​​3M​​​,所以在第一步的大文件切分 ​​PartitionedFileUtil#splitFiles​​​ 直接返回 ​​200​​​个​​PartitionedFile​​​,主要是在第二步的小文件合并。​​3M/200=16​​​K,所以平均每个文件的大小为 ​​16k​​​,相比较于​​maxSplitBytes​​​的 ​​100M​​​,可以忽略不计,我们主要要考虑的是打开文件的代价​​openCostInBytes​​​。所以在​​FilePartition#getFilePartitions​​​ 里大概会有 ​​25​​​ 个文件合并成一个新的​​FilePartition​​​(​​25*openCostInBytes=100M​​​)。
最终形成的​​​FilePartition​​​ 个数为​​200/25=8​​​,该值也就是我们的 ​​task​​ 数

思考

兄弟们,分析完了,开心了。然而学习就要知其然,知其所以然。来思考几个问题

  1. ​Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))​​ 分 区文件的最大大小为什么如此定义?

按照我的理解,如果一个人想要最快的执行完成他的任务,那么他肯定想最大限度的利用集群所有的 ​​core​​​ 的能力,但是我们的​​core​​​个数是有限制的,文件的大小是不可控的。所以此时需要一个​​defaultMaxSplitBytes​​​来控制单个​​core​​​处理的最大文件大小,避免单个core处理的数据量太大导致频繁​​full gc​​​ 或者​​oom​​.

  1. ​PartitionedFileUtil#splitFiles​​​ 已经对大文件进行切分了,为啥下面还要​​FilePartition#getFilePartitions​​对小文件合并?

首先,文件可能由于上游任务reduce task处理的数据量不同生成的文件大小可能不均匀,第二点,大文件切分后可能会生成小文件。综上,需要对小文件进行合并。

  1. ​spark​​​ 为啥不对默认的​​text​​ 格式做小文件合并操作?

这个问题我也很疑问,​​debug​​​后发现​​spark​​​对于 ​​text​​​ 格式表的文件读取会生成​​hadoop rdd​​​,最后使用​​FileInputFormat​​​ 读取。而​​FileInputFormat​​​ 是属于 ​​hadoop-mapreduce​​ 包的类,也就是说spark是无法控制的。

  1. 文件在什么情况下是可分割的(​​isSplitable=true​​)?

一般来说不同格式的非压缩文件都是可分割的,使用了压缩算法后,部分压缩算法不支持分割,比如我们常用的​​snappy​​​,而 ​​lzo​​​ 还是支持的分割的。需要注意的是文件是否可分割也和格式+压缩算法有关,比如 ​​text+snappy​​​ 的文件时不可分割的,但是 ​​parquet+snappy​​ 是可分割的。

扫码关注,获取最新文章

spark剖析:spark读取parquet文件会有多少个task_spark小文件_08


举报

相关推荐

0 条评论