在 Apache Spark 中,stage 是执行作业时的重要执行单元。一个 Spark 作业会被划分为若干个 stage,每个 stage 由一组可以并行执行的任务组成。这种划分主要依赖于 RDD 中的操作类型(窄依赖和宽依赖)。下面我们来讨论 Spark stage 的创建和划分的原理以及代码实现的核心逻辑。
Spark Stage 划分的原理
-
RDD 依赖(窄依赖和宽依赖):
- Spark 中,RDD 可以有两种依赖关系:
- 窄依赖(narrow dependency):父 RDD 的每个分区至多被子 RDD 的一个分区使用,典型的操作如
map、filter等。 - 宽依赖(wide dependency):父 RDD 的每个分区可能被多个子 RDD 的分区使用,典型的操作如
reduceByKey、groupByKey等,这类操作会触发shuffle。
- 窄依赖(narrow dependency):父 RDD 的每个分区至多被子 RDD 的一个分区使用,典型的操作如
- 窄依赖的 RDD 操作可以被划分到同一个
stage中,而宽依赖的 RDD 操作会触发shuffle,导致stage划分。
- Spark 中,RDD 可以有两种依赖关系:
-
DAG(有向无环图):
Spark 的作业会构建一个 RDD 的依赖图(DAG)。这个 DAG 中每个 RDD 的窄依赖操作会被合并成一个stage,宽依赖操作会划分出不同的stage,并在两个stage之间插入shuffle。 -
Stage划分规则:- 每当遇到一个宽依赖(如
reduceByKey、join、groupByKey等),Spark 会创建一个新的stage,并将之前的 RDD 操作划分到一个stage中,形成一个有序的stage执行链。 stage划分的核心任务是:将窄依赖操作尽可能合并到一起,直到遇到需要shuffle的宽依赖操作。
- 每当遇到一个宽依赖(如
Spark Stage 划分的核心代码逻辑
Spark 的 DAG 划分及 stage 划分主要在 DAGScheduler 中实现。DAGScheduler 是 Spark 作业调度的核心组件,负责将逻辑作业(job)划分为多个 stage,并调度这些 stage 执行。
以下是 Spark 3.x 版本中有关 stage 划分的核心逻辑及其简化代码片段。
1. DAGScheduler 类
DAGScheduler 类位于 org.apache.spark.scheduler 包下,它负责管理 RDD 依赖关系并创建 stage。DAGScheduler 会根据 RDD 的依赖图和操作类型,生成任务的 DAG 并划分 stage。
class DAGScheduler(
// 参数略...
) extends Logging {
// stage 列表
private val stages = new HashMap[StageId, Stage]()
// 提交 Job 时触发的函数
def submitJob(
rdd: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Seq[Int],
callSite: CallSite,
allowLocal: Boolean,
resultHandler: (Int, _) => Unit,
properties: Properties = null): JobWaiter[_] = {
// 根据 RDD 和依赖关系生成最终的 ResultStage
val finalStage = createFinalStage(rdd, partitions, callSite)
// 提交该 stage 执行
submitStage(finalStage)
}
// 创建 ResultStage 和后续的 Stage
private def createFinalStage(
rdd: RDD[_],
partitions: Seq[Int],
callSite: CallSite): ResultStage = {
// 创建该作业的最终的 stage,并递归创建所有依赖的 stage
val finalStage = newStage(rdd, partitions)
finalStage
}
// 递归生成各个 Stage,核心逻辑
private def newStage(rdd: RDD[_], partitions: Seq[Int]): Stage = {
// 检查缓存,避免重复生成 Stage
stages.getOrElseUpdate(rdd.id, {
val shuffleDeps = getShuffleDependencies(rdd)
// 如果存在宽依赖,则要划分为不同的 stage
if (shuffleDeps.nonEmpty) {
val parentStages = shuffleDeps.map { dep =>
newStage(dep.rdd, dep.rdd.partitions.indices)
}
val newStage = new ShuffleMapStage(rdd, parentStages)
stages(newStage.id) = newStage
newStage
} else {
// 如果只有窄依赖,当前操作在同一个 stage 内
val parentStages = getNarrowDependencies(rdd).map { dep =>
newStage(dep.rdd, dep.rdd.partitions.indices)
}
val newStage = new ResultStage(rdd, parentStages)
stages(newStage.id) = newStage
newStage
}
})
}
// 获取 RDD 的 shuffle 依赖(宽依赖)
private def getShuffleDependencies(rdd: RDD[_]): List[ShuffleDependency[_, _, _]] = {
rdd.dependencies.collect {
case shuffleDep: ShuffleDependency[_, _, _] => shuffleDep
}
}
// 获取 RDD 的窄依赖
private def getNarrowDependencies(rdd: RDD[_]): List[Dependency[_]] = {
rdd.dependencies.collect {
case narrowDep: NarrowDependency[_] => narrowDep
}
}
}
2. Stage 划分的基本过程
-
RDD 依赖遍历:通过
newStage函数递归遍历 RDD 的依赖关系,将遇到的每一个shuffle依赖(宽依赖)创建一个新的ShuffleMapStage,而ResultStage则用于最终计算结果。 -
宽依赖处理:当遇到宽依赖(
ShuffleDependency),说明需要进行shuffle,因此要创建一个新的stage。 -
窄依赖处理:当只有窄依赖时,RDD 可以继续合并在当前的
stage中。
3. ShuffleMapStage 和 ResultStage
ShuffleMapStage 和 ResultStage 是 Spark 中两种类型的 Stage:
ShuffleMapStage:处理宽依赖(shuffle),该stage会产生shuffle文件供下游stage使用。ResultStage:最终计算Action(如collect、saveAsTextFile等)结果的stage,是 DAG 中的最后一个stage。
代码流程总结
DAGScheduler在收到作业时,会从最后的Action开始,通过递归函数newStage,根据 RDD 的依赖关系逐步向上遍历。- 当遇到
shuffle依赖时,会将其划分为不同的stage,每个shuffle依赖会产生一个ShuffleMapStage。 - 所有的窄依赖 RDD 操作则合并为一个
stage,在同一个stage中执行。 submitStage负责将划分好的stage发送给 TaskScheduler,TaskScheduler 则进一步调度任务到集群执行。
总结
- 窄依赖操作:操作在同一个
stage中执行,尽可能合并,减少shuffle。 - 宽依赖操作:每个宽依赖会触发新的
stage,并引入shuffle,每个shuffle会将数据重新分布给后续的stage。 DAGScheduler的作用:DAG 调度器负责将 RDD 操作链划分为多个stage,并根据依赖关系生成一个 DAG。










