1. 背景
在上一篇文章中:https://blog.51cto.com/u_15327484/7834523,介绍了applicationmaster申请资源流程:
- 注册applicationmaster。
- 发送资源分配请求。
- 获取资源,启动container。
- 发送剩余所需资源的分配请求。
然而,虽然是通过ApplicationMasterProtocol#allocate进行资源分配,但实际上并不是由该接口直接从资源池里面获取资源,它只是获取分配结果,具体分配逻辑是由nodemanager的心跳进行触发的。本文将介绍nodemanager通过心跳请求,触发resourcemanager资源调度行为的流程。
2. 心跳相关协议
ResourceTracker协议中的三种通信内容如下:
- registerNodeManager:NodeManager向ResourceManager注册;
- nodeHeartbeat:NodeManager周期性心跳汇报;
- unRegisterNodeManager:NodeManager取消注册;
NodeManager通过nodeHeartbeat方法,向ResourceManager汇报自身资源使用情况。例如:当前可用资源、正在使用的资源、已经释放的资源。ResourceManager会调用nodeUpdate方法,进行资源调度,它会从队列中取出合适的应用资源请求,放到该nodemanager执行。
3. Nodemanager发送心跳请求
NodeStatusUpdaterImpl作为Nodemanager的一个Service,在NodeManager启动时,NodeStatusUpdaterImpl会启动一个新线程,用于发起定时的心跳任务。心跳线程主要步骤如下:
- 获取节点状态。
- 构建心跳请求。
- 执行心跳请求。
如下所示:
protected void startStatusUpdater() {
statusUpdaterRunnable = new Runnable() {
@Override
@SuppressWarnings("unchecked")
public void run() {
int lastHeartBeatID = 0;
while (!isStopped) {
// Send heartbeat
try {
NodeHeartbeatResponse response = null;
//获取当前NM节点的状态
NodeStatus nodeStatus = getNodeStatus(lastHeartBeatID);
//构建心跳请求
NodeHeartbeatRequest request =
NodeHeartbeatRequest.newInstance(nodeStatus,
NodeStatusUpdaterImpl.this.context
.getContainerTokenSecretManager().getCurrentKey(),
NodeStatusUpdaterImpl.this.context.getNMTokenSecretManager()
.getCurrentKey());
if (logAggregationEnabled) {
// pull log aggregation status for application running in this NM
List<LogAggregationReport> logAggregationReports =
getLogAggregationReportsForApps(context
.getLogAggregationStatusForApps());
if (logAggregationReports != null
&& !logAggregationReports.isEmpty()) {
request.setLogAggregationReportsForApps(logAggregationReports);
}
}
//执行心跳请求
response = resourceTracker.nodeHeartbeat(request);
//get next heartbeat interval from response
nextHeartBeatInterval = response.getNextHeartBeatInterval();
updateMasterKeys(response);
//省略
}
//启动定时心跳线程
statusUpdater =
new Thread(statusUpdaterRunnable, "Node Status Updater");
statusUpdater.start();
}
NM节点的状态信息由proto定义。如下所示,包含节点信息,container状态信息,节点健康信息,application信息。其中,container和application信息都是以数组的形式上报:
message NodeStatusProto {
optional NodeIdProto node_id = 1;
optional int32 response_id = 2;
repeated ContainerStatusProto containersStatuses = 3;
optional NodeHealthStatusProto nodeHealthStatus = 4;
repeated ApplicationIdProto keep_alive_applications = 5;
}
4. ResourceManager执行调度行为
在ResourceManager侧,使用ResourceTrackerService#nodeHeartbeat处理心跳请求:
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnException, IOException {
// Heartbeat response
NodeHeartbeatResponse nodeHeartBeatResponse =
YarnServerBuilderUtils.newNodeHeartbeatResponse(
getNextResponseId(lastNodeHeartbeatResponse.getResponseId()),
NodeAction.NORMAL, null, null, null, null, nextHeartBeatInterval);
rmNode.setAndUpdateNodeHeartbeatResponse(nodeHeartBeatResponse);
.....
// 4. Send status to RMNode, saving the latest response.
RMNodeStatusEvent nodeStatusEvent =
new RMNodeStatusEvent(nodeId, remoteNodeStatus);
if (request.getLogAggregationReportsForApps() != null
&& !request.getLogAggregationReportsForApps().isEmpty()) {
nodeStatusEvent.setLogAggregationReportsForApps(request
.getLogAggregationReportsForApps());
}
this.rmContext.getDispatcher().getEventHandler().handle(nodeStatusEvent);
......
return nodeHeartBeatResponse;
}
依次进行如下流程处理:
- StatusUpdateWhenHealthyTransition处理RMNodeStatusEvent.STATUS_UPDATE事件。
- ResourceScheduler处理SchedulerEvent.NODE_UPDATE事件。
本文以ResourceScheduler的实现类FifoScheduler为例子进行讲解。依次调用nodeUpdate→assignContainer方法进行资源分配。
assignContainer方法中,调用FiCaSchedulerApp#allocate分配资源。可以看到,allocate方法直接将创建的资源放到newlyAllocatedContainers队列中。AppMaster调用FifoScheduler#allocate获取资源,其实就是获取FiCaSchedulerApp放到newlyAllocatedContainers队列中的资源。如下所示:
public RMContainer allocate(NodeType type, FiCaSchedulerNode node,
Priority priority, ResourceRequest request,
Container container) {
try {
writeLock.lock();
if (isStopped) {
return null;
}
// Required sanity check - AM can call 'allocate' to update resource
// request without locking the scheduler, hence we need to check
if (getTotalRequiredResources(priority) <= 0) {
return null;
}
// Create RMContainer
RMContainer rmContainer = new RMContainerImpl(container, this
.getApplicationAttemptId(), node.getNodeID(),
appSchedulingInfo.getUser(), this.rmContext);
// Add it to allContainers list.
//直接分配资源,并将资源放到newlyAllocatedContainers中
newlyAllocatedContainers.add(rmContainer);
liveContainers.put(container.getId(), rmContainer);
// Update consumption and track allocations
List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
type, node, priority, request, container);
Resources.addTo(currentConsumption, container.getResource());
// Update resource requests related to "request" and store in RMContainer
((RMContainerImpl)rmContainer).setResourceRequests(resourceRequestList);
// Inform the container
rmContainer.handle(
new RMContainerEvent(container.getId(), RMContainerEventType.START));
if (LOG.isDebugEnabled()) {
LOG.debug("allocate: applicationAttemptId="
+ container.getId().getApplicationAttemptId()
+ " container=" + container.getId() + " host="
+ container.getNodeId().getHost() + " type=" + type);
}
RMAuditLogger.logSuccess(getUser(),
AuditConstants.ALLOC_CONTAINER, "SchedulerApp",
getApplicationId(), container.getId());
return rmContainer;
} finally {
writeLock.unlock();
}
}
5. 总结
- NodeManager启动时,会创建线程调用startStatusUpdater方法向ResourceManager定期发送心跳。
- ResourceManager中最终使用ResouceScheduler的实现类的nodeUpdate方法处理心跳。该方法中,会给NodeManager上分配资源,分配的资源进行封装,放到newlyAllocatedContainers队列中。
- ApplicationMaster会定时向ResourceManager发送资源请求,ResourceManager最终执行ResourceScheduler#allocate方法从newlyAllocatedContainers队列中获取给当前app分配的资源。并向NodeManager发送请求启动容器。