第31课: Spark资源调度分配内幕天机彻底解密:Driver在Cluster模式下的启动、两种不同的资源调度方式源码彻底解析、资源调度内幕总结
一:任务调度与资源调度的区别:
l 任务调度是通过DAGScheduler、TaskScheduler、SchedulerBackend等进行的作业调度;
l 资源调度是指应用程序如何获得资源;
l 任务调度是在资源调度的基础上进行的,没有资源调度那么任务调度就成为了无源之水无本之木!
二:资源调度内幕天机解密
(1)因为Master负责资源管理和调度,所以资源调度的方法shedule位于Master.scala这个类中,当注册程序或者资源发生改变的时候都会导致schedule的调用,例如注册程序的时候:
1.             case RegisterApplication(description,driver) =>
2.             // TODO Prevent repeated registrationsfrom some driver
3.             if (state == RecoveryState.STANDBY) {
4.               // ignore, don't send response
5.             } else {
6.               logInfo("Registering app " +description.name)
7.               val app =createApplication(description, driver)
8.               registerApplication(app)
9.               logInfo("Registered app " +description.name + " with ID " + app.id)
10.             persistenceEngine.addApplication(app)
11.             driver.send(RegisteredApplication(app.id,self))
12.             schedule()
13.           }
(2)Schedule调用的时机:每次有新的应用程序提交或者集群资源状况发生改变的时候(包括Executor增加或者减少、Worker增加或者减少等);
进入schedule(),schedule为当前等待的应用程序分配可用的资源。每当一个新的应用程序进来的时候,schedule都会被调用。或者资源发生变化的时候(例如Executor挂掉,Worker挂掉,或者新增加机器),schedule都会被调用。schedule():源码如下:
1.             privatedef schedule(): Unit = {
2.           if (state != RecoveryState.ALIVE) {
3.             return
4.           }
5.           // Drivers take strict precedence overexecutors
6.           val shuffledAliveWorkers =Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
7.           val numWorkersAlive =shuffledAliveWorkers.size
8.           var curPos = 0
9.           for (driver <- waitingDrivers.toList) {// iterate over a copy of waitingDrivers
10.           // We assign workers toeach waiting driver in a round-robin fashion. For each driver, we
11.           // start from the lastworker that was assigned a driver, and continue onwards until we have
12.           // explored all aliveworkers.
13.           var launched = false
14.           var numWorkersVisited =0
15.           while (numWorkersVisited< numWorkersAlive && !launched) {
16.             val worker =shuffledAliveWorkers(curPos)
17.             numWorkersVisited += 1
18.             if (worker.memoryFree>= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
19.               launchDriver(worker,driver)
20.               waitingDrivers -=driver
21.               launched = true
22.             }
23.             curPos = (curPos + 1)% numWorkersAlive
24.           }
25.         }
26.         startExecutorsOnWorkers()
27.       }
(3)当前Master必须是Alive的方式采用进行资源的调度,如果不是ALIVE的状态会直接返回,也就是StandbyMaster不会进行Application的资源调用!
1.              if (state != RecoveryState.ALIVE) {
2.             return
3.           }
(4)接下来通过workers.toSeq.filter(_.state == WorkerState.ALIVE)过滤判断所有Worker中哪些是ALIVE级别的Worker,ALIVE才能够参与资源的分配工作:
1.         valshuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state ==WorkerState.ALIVE))(5)使用Random.shuffle把Master中保留的集群中所有ALIVE级别的Worker的信息随机打乱;Master的schedule()方法中:workers是一个数据结构,打乱workers有利于负载均衡,例如不是以固定的顺序启动launchDriver。WorkerInfo 是Worker注册的时候将信息注册过来。
1.       val workers = newHashSet[WorkerInfo]
2.       …….   
3.        val shuffledAliveWorkers =Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))WorkerInfo.scala的源码:
1.        private[spark] class WorkerInfo(
2.           val id: String,
3.           val host: String,
4.           val port: Int,
5.           val cores: Int,
6.           val memory: Int,
7.           val endpoint: RpcEndpointRef,
8.           val webUiAddress: String)
9.         extends Serializable {
我们看一下随机打乱的算法:将Worker的信息传进来,先new出来一个ArrayBuffer,将所有的信息放进去。然后将两个索引位置的内容进行交换。例如:如果有4个Worker,依次分别为第一个Worker至第四个Worker,第一个位置是第1个Worker,第2个位置是第2个Worker,第3个位置是第3个Worker,第4个位置是第4个Worker;通过shuffle以后,现在第一个位置可能是第3个Worker,第2个位置可能是第1个Worker,第3个位置可能是第4个Worker,第4个位置可能是第2个Worker,位置信息打乱。
Random.scala中shuffle方法,其算法内部是循环随机交换所有Worker在Master缓存数据结构中的位置:
1.        def shuffle[T, CC[X] <:TraversableOnce[X]](xs: CC[T])(implicit bf: CanBuildFrom[CC[T], T, CC[T]]):CC[T] = {
2.           val buf = new ArrayBuffer[T] ++= xs
3.        
4.           def swap(i1: Int, i2: Int) {
5.             val tmp = buf(i1)
6.             buf(i1) = buf(i2)
7.             buf(i2) = tmp
8.           }
9.        
10.         for (n <- buf.length to2 by -1) {
11.           val k = nextInt(n)
12.           swap(n - 1, k)
13.         }
14.      
15.         (bf(xs) ++= buf).result
16.       }(6) Master的schedule()方法中:循环遍历等待启动的Driver,如果是Client模式不需要waitingDrivers等待;如果是Cluster模式,此时Driver会加入waitingDrivers等待列表。
schedule()方法源码:
1.           for (driver <- waitingDrivers.toList) { //iterate over a copy of waitingDrivers
2.             // We assign workers to each waitingdriver in a round-robin fashion. For each driver, we
3.             // start from the last worker that wasassigned a driver, and continue onwards until we have
4.             // explored all alive workers.
5.             var launched = false
6.             var numWorkersVisited = 0
7.             while (numWorkersVisited <numWorkersAlive && !launched) {
8.               val worker =shuffledAliveWorkers(curPos)
9.               numWorkersVisited += 1
10.             if (worker.memoryFree>= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
11.               launchDriver(worker,driver)
12.               waitingDrivers -=driver
13.               launched = true
14.             }
15.             curPos = (curPos + 1)% numWorkersAlive
16.           }
17.         }
当SparkSubmit指定Driver在Cluster模式的情况下,此时Driver会加入waitingDrivers等待列表中,在每个DriverInfo的DriverDescription中有要启动Driver时候对Worker的内存及Cores是要求等内容:
1.        private val waitingDrivers = newArrayBuffer[DriverInfo]
2.       ……DriverInfo包括启动时间、ID、描述信息、提交时间等内容:
DriverInfo.scala源码:
1.        private[deploy] class DriverInfo(
2.           val startTime: Long,
3.           val id: String,
4.           val desc: DriverDescription,
5.           val submitDate: Date)
6.         extends Serializable {其中的DriverInfo的DriverDescription描述信息中包括jarUrl、内存、Cores、supervise、command等内容,如果在Cluster模式中,指定supervise为True,那么在Driver挂掉的时候自动重启。
DriverDescription.scala源码:
1.          private[deploy]case class DriverDescription(
2.           jarUrl: String,
3.           mem: Int,
4.           cores: Int,
5.           supervise: Boolean,
6.           command: Command) {在符合资源要求的情况下然后采用随机打乱后的一个Worker来启动Driver,worker是Master中对Worker的一个描述。
Master.scala的launchDriver方法:
1.             private def launchDriver(worker: WorkerInfo,driver: DriverInfo) {
2.           logInfo("Launching driver " +driver.id + " on worker " + worker.id)
3.           worker.addDriver(driver)
4.           driver.worker = Some(worker)
5.           worker.endpoint.send(LaunchDriver(driver.id,driver.desc))
6.           driver.state = DriverState.RUNNING
7.         }Master通过worker.endpoint.send(LaunchDriver发指令给Worker,让远程的Worker启动Driver,Driver启动以后,Driver的状态就变成DriverState.RUNNING。
(7)先启动Driver才会发生后续的一切的资源调度的模式。
(8)Spark默认为应用程序启动Executor的方式是FIFO的方式,也就是所有提交的应用程序都是放在调度的等待队列中的,先进先出,只有满足了前面应用程序的资源分配的基础上才能够满足下一个应用程序资源的分配;
Master的schedule()方法中,调用startExecutorsOnWorkers(),为当前的程序调度和启动Worker的Executor,默认情况下是排队的方式FIFO。
startExecutorsOnWorkers的源码如下:
1.         privatedef startExecutorsOnWorkers(): Unit = {
2.           // Right now this is a very simple FIFOscheduler. We keep trying to fit in the first app
3.           // in the queue, then the second app, etc.
4.           for (app <- waitingApps if app.coresLeft> 0) {
5.             val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
6.             // Filter out workers that don't haveenough resources to launch an executor
7.             val usableWorkers =workers.toArray.filter(_.state == WorkerState.ALIVE)
8.               .filter(worker => worker.memoryFree>= app.desc.memoryPerExecutorMB &&
9.                 worker.coresFree >=coresPerExecutor.getOrElse(1))
10.             .sortBy(_.coresFree).reverse
11.           val assignedCores =scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
12.      
13.           // Now that we'vedecided how many cores to allocate on each worker, let's allocate them
14.           for (pos <- 0 untilusableWorkers.length if assignedCores(pos) > 0) {
15.             allocateWorkerResourceToExecutors(
16.               app,assignedCores(pos), coresPerExecutor, usableWorkers(pos))
17.           }
18.         }
19.       }(9)为应用程序具体分配Executor之前要判断应用程序是否还需要分配Core,如果不需要则不会为应用程序分配Executor;
startExecutorsOnWorkers中的coresLeft 是请求的requestedCores和可用的 coresGranted的相减值。例如如果整个程序要求1000个Cores,但是目前集群可用的只有100个Cores,如果coresLeft不为0,就放入等待队列中;如果coresLeft是0那么就不需要调度。
1.         private[master]def coresLeft: Int = requestedCores - coresGranted(10)Master.scala的startExecutorsOnWorkers中,具体分配Executor之前要对要求Worker必须是ALIVE的状态且必须满足Application对每个Executor的内存和Cores的要求,并且在此基础上进行排序产生计算资源由大到小的usableWorkers数据结构:
1.             val usableWorkers =workers.toArray.filter(_.state == WorkerState.ALIVE)
2.               .filter(worker => worker.memoryFree>= app.desc.memoryPerExecutorMB &&
3.                 worker.coresFree >=coresPerExecutor.getOrElse(1))
4.               .sortBy(_.coresFree).reverse
5.       val assignedCores =scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)然后是调用scheduleExecutorsOnWorkers, 在FIFO的情况下默认是spreadOutApps来让应用程序尽可能多的运行在所有的Node上:
1.           private val spreadOutApps =conf.getBoolean("spark.deploy.spreadOut", true)
scheduleExecutorsOnWorker中,minCoresPerExecutor 每个Executor最小分配的core个数。scheduleExecutorsOnWorker源码如下:
1.          private def scheduleExecutorsOnWorkers(
2.             app: ApplicationInfo,
3.             usableWorkers: Array[WorkerInfo],
4.             spreadOutApps: Boolean): Array[Int] = {
5.           val coresPerExecutor =app.desc.coresPerExecutor
6.           val minCoresPerExecutor =coresPerExecutor.getOrElse(1)
7.           val oneExecutorPerWorker =coresPerExecutor.isEmpty
8.           val memoryPerExecutor =app.desc.memoryPerExecutorMB
9.           val numUsable = usableWorkers.length
10.         val assignedCores = newArray[Int](numUsable) // Number of cores to give to each worker
11.         val assignedExecutors =new Array[Int](numUsable) // Number of new executors on each worker
12.         var coresToAssign =math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
13.     ……
(11) 为应用程序分配Executors有两种方式,第一种方式是尽可能在集群的所有Worker上分配Executor,这种方式往往会带来潜在的更好的数据本地性;第二种方式,尝试运行在尽可能少的Worker上。
(12)具体在集群上分配Cores的时候会尽可能的满足我们的要求:math.min计算最小值: coresToAssig是计算app.coresLeft与可用的Worker中可用的Cores的和的最小值。例如应用程序要求1000个Cores,但整个集群中只有100个Cores,所以只能先分配100个Cores。
scheduleExecutorsOnWorkers方法:
1.            var coresToAssign = math.min(app.coresLeft,usableWorkers.map(_.coresFree).sum)
2.       ……
(13)如果是每个Worker下面只能够为当前的应用程序分配一个Executor的话,每次是分配一个Core!scheduleExecutorsOnWorkers方法:
1.              if (oneExecutorPerWorker) {
2.                   assignedExecutors(pos) = 1
3.                 } else {
4.                   assignedExecutors(pos) += 1
5.                 }
总结一下2种情况:一个情况是尽可能在一台机器上去运行程序的所有功能,另一种情况尽可能在所有的节点上。无论是哪种情况,每次给Executor增加Cores是增加一个,如果是spreadOutApps的方式,循环一轮再下一轮,例如有4个Worker,第一次为每个Executor启动一个线程,第二次循环分配一个线程,第三次循环再分配一个线程......;
scheduleExecutorsOnWorkers方法:
1.                    while(freeWorkers.nonEmpty) {
2.             freeWorkers.foreach { pos =>
3.               var keepScheduling = true
4.               while (keepScheduling &&canLaunchExecutor(pos)) {
5.                 coresToAssign -= minCoresPerExecutor
6.                 assignedCores(pos) +=minCoresPerExecutor
7.        
8.                 // If we are launching one executorper worker, then every iteration assigns 1 core
9.                 // to the executor. Otherwise, everyiteration assigns cores to a new executor.
10.               if(oneExecutorPerWorker) {
11.                 assignedExecutors(pos) = 1
12.               } else {
13.                 assignedExecutors(pos) += 1
14.               }
15.      
16.               // Spreading out anapplication means spreading out its executors across as
17.               // many workers aspossible. If we are not spreading out, then we should keep
18.               // schedulingexecutors on this worker until we use all of its resources.
19.               // Otherwise, justmove on to the next worker.
20.               if (spreadOutApps) {
21.                 keepScheduling =false
22.               }
23.             }
24.           }
回到Master.scala的startExecutorsOnWorkers,现在已经决定每个 worker分配多少个cores ,那进行资源分配:
1.                   for (pos <- 0 untilusableWorkers.length if assignedCores(pos) > 0) {
2.               allocateWorkerResourceToExecutors(
3.                 app, assignedCores(pos), coresPerExecutor,usableWorkers(pos))
4.             }allocateWorkerResourceToExecutors源码如下:
1.            privatedef allocateWorkerResourceToExecutors(
2.             app: ApplicationInfo,
3.             assignedCores: Int,
4.             coresPerExecutor: Option[Int],
5.             worker: WorkerInfo): Unit = {
6.           // If the number of cores per executor isspecified, we divide the cores assigned
7.           // to this worker evenly among theexecutors with no remainder.
8.           // Otherwise, we launch a single executorthat grabs all the assignedCores on this worker.
9.           val numExecutors = coresPerExecutor.map {assignedCores / _ }.getOrElse(1)
10.         val coresToAssign =coresPerExecutor.getOrElse(assignedCores)
11.         for (i <- 1 tonumExecutors) {
12.           val exec = app.addExecutor(worker,coresToAssign)
13.           launchExecutor(worker,exec)
14.           app.state =ApplicationState.RUNNING
15.         }
16.       }
allocateWorkerResourceToExecutors中app.addExecutor增加一个Excutor,记录Executor的相关信息:
1.            private[master] def addExecutor(
2.             worker: WorkerInfo,
3.             cores: Int,
4.             useID: Option[Int] = None): ExecutorDesc= {
5.           val exec = newExecutorDesc(newExecutorId(useID), this, worker, cores,desc.memoryPerExecutorMB)
6.           executors(exec.id) = exec
7.           coresGranted += cores
8.           exec
9.         }回到allocateWorkerResourceToExecutors方法中launchExecutor(worker, exec)启动Executor:
1.         launchExecutor(worker,exec)(14)准备具体要为当前应用程序分配的Executor信息后,Master要通过远程通信发指令给Worker来具体启动ExecutorBackend进程:
launchExecutor方法:
1.         private def launchExecutor(worker:WorkerInfo, exec: ExecutorDesc): Unit = {
2.           logInfo("Launching executor " +exec.fullId + " on worker " + worker.id)
3.           worker.addExecutor(exec)
4.           worker.endpoint.send(LaunchExecutor(masterUrl,
5.             exec.application.id, exec.id,exec.application.desc, exec.cores, exec.memory))
6.(15)紧接着给我们应用程序的Driver发送一个ExecutorAdded的信息
launchExecutor方法:
1.           exec.application.driver.send(
2.             ExecutorAdded(exec.id, worker.id,worker.hostPort, exec.cores, exec.memory))
3.         }










