0
点赞
收藏
分享

微信扫一扫

Yarn状态机框架分析

1. 前言

上一篇文章介绍了Yarn事件驱动模型框架分析,了解到Yarn基于生产者-消费者模式处理事件。基于GenericEventHandler#handle生产事件;通过自定义的Handler实现类消费事件。其中,在消费事件时,会导致Yarn中对象状态的变化,将对象所有状态的变化情况汇总起来就是状态机。本文将介绍Yarn状态机框架实现思路。

2. 什么是状态机

状态机(State Machine),是有限状态自动机的简称,是现实事物运行规则抽象而成的一个数学模型:给定一个状态机,同时给定它的当前状态以及输入,那么输出状态时可以明确的运算出来的。例如对于自动门,给定初始状态closed,给定输入“开门”,那么下一个状态时可以运算出来的。自动门状态机基于状态转换图如下所示:

image.png

在Yarn中,状态机中最重要的两大概念是State状态和Event事件。例如,对于一个应用RMApp而言,RMApp存在一个初始状态,处理事件时,会根据事件类型匹配对应的转换类Transition,将RMApp从初始状态转化成目标状态。RMApp经历的流程为:初始状态-->转换方法-->目标状态,将其所有流程汇总起来,就是状态机。

在Yarn中,App、AppAttempt、Container、Node都可以使用状态机表示。其中,RMApp:用于维护一个Application的生命周期;RMAppAttempt:用于维护一次尝试运行的生命周期;RMContainer:用于维护一个已分配的资源最小单位Container的生命周期;RMNode:用于维护一个NodeManager的生命周期。

3. 为什么要设计状态机

对于任意实体,与其相关的事件可能非常多,事件对应的类型和初始状态多种多样。如果不合理的组织起来,实体的状态转换流程会负责冗杂。Yarn状态机就负责合理地组织这些状态转换流程,快速找到指定初始状态和事件类型对应的状态转换方法。

4. 何时使用状态机

在Handler消费事件队列中的事件时,会使用状态机,更新其对象的状态。状态机的使用流程分为两步:

  1. 第一步:Service注册Handler。
  2. 第二步:Handler使用状态机。

4.1 Service注册Handler

以RMApp的状态机为例,Active ResourceManager服务会管理RMApp对象的生命周期,RMApp状态机负责管理RMApp的状态变化。在Active ResourceManager初始化方法ResourceManager$RMActiveServices#serviceInit中,将RMAppEventType类型的事件注册了Handler实现类ApplicationEventDispatcher,ApplicationEventDispatcher负责处理RMAppEventType类型事件:

public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean {
public class RMActiveServices extends CompositeService {
protected void serviceInit(Configuration configuration) throws Exception {
//省略
rmDispatcher.register(RMAppEventType.class, new ApplicationEventDispatcher(rmContext));
//省略
}
}
}

4.2 Handler使用状态机

ApplicationEventDispatcher实现了EventHandler接口,其handler实现方法并没有真正地处理RMAppEvent事件,而是由RMApp处理:

  public static final class ApplicationEventDispatcher implements EventHandler<RMAppEvent> {

private final RMContext rmContext;

public ApplicationEventDispatcher(RMContext rmContext) {
this.rmContext = rmContext;
}

@Override
public void handle(RMAppEvent event) {
ApplicationId appID = event.getApplicationId();
//rmContext对象中包含了appId对应的RMApp对象,实际上RMApp对应的对象类型是其实现类RMAppImpl
RMApp rmApp = this.rmContext.getRMApps().get(appID);
//省略
rmApp.handle(event);
//省略
}
}
}

在RMAppImpl#handle负责处理RMAppEvent事件,RMAppImpl实现了RMApp接口,而RMApp接口也继承了EventHandler接口:

public interface RMApp extends EventHandler<RMAppEvent> {...}

因此RMAppImpl也是一个Handler,在RMAppImpl处理过程中,使用状态机改变RMApp对象状态:

public class RMAppImpl implements RMApp, Recoverable {
public void handle(RMAppEvent event) {
//省略
this.stateMachine.doTransition(event.getType(), event);
//省略
}
}

从RMAppImpl#handle方法可以看出,RMAppImpl维护了成员变量stateMachine,stateMachine就是状态机,通过该变量处理事件,并将RMAppImpl对象的状态进行转化。

5. 状态机初始化

对于要维护的对象,一般该对象对应的类负责初始化状态机。例如RMAppImpl作为一个被维护的应用对象,就包含了 StateMachineFactory成员,StateMachineFactory负责构建状态机。如下,通过StateMachineFactory#addTransition负责添加状态转换方法,StateMachineFactory#installTopology负责创建状态机:

public class RMAppImpl implements RMApp, Recoverable {
private static final StateMachineFactory<RMAppImpl, RMAppState, RMAppEventType, RMAppEvent> stateMachineFactory =
new StateMachineFactory<RMAppImpl, RMAppState, RMAppEventType, RMAppEvent>(RMAppState.NEW)
.addTransition(RMAppState.NEW, RMAppState.NEW,RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.NEW, RMAppState.NEW_SAVING, RMAppEventType.START, new RMAppNewlySavingTransition())
.addTransition(RMAppState.NEW,
EnumSet.of(RMAppState.SUBMITTED, RMAppState.ACCEPTED, RMAppState.FINISHED, RMAppState.FAILED,RMAppState.KILLED, State.FINAL_SAVING)
, RMAppEventType.RECOVER, new RMAppRecoveredTransition())
//省略
.installTopology();
}

StateMachineFactory负责真正存储具体的状态机,它由两个成员变量组成:transitionsListNode和stateMachineTable。transitionsListNode负责暂时存储Transition方法,最终会将transitionsListNode中的方法弹出,并放到stateMachineTable中存储。StateMachineFactory定义如下:

final public class StateMachineFactory<OPERAND, STATE extends Enum<STATE>, EVENTTYPE extends Enum<EVENTTYPE>, EVENT> {
private final TransitionsListNode transitionsListNode;
private Map<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>> stateMachineTable;

public StateMachineFactory(STATE defaultInitialState) {
this.transitionsListNode = null;
this.defaultInitialState = defaultInitialState;
this.optimized = false;
this.stateMachineTable = null;
}
}

5.1 TransitionsListNode暂存Transtion流程

TransitionsListNode是链表结构,节点存储Transition状态转换方法,转换方法类型是ApplicableTransition:

private class TransitionsListNode {
final ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> transition;
final TransitionsListNode next;

TransitionsListNode(ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> transition, TransitionsListNode next) {
this.transition = transition;
this.next = next;
}
}

ApplicableTransition是一个接口,它的泛型有四个:OPERAND, STATE, EVENTTYPE, EVENT。其中,OPERAND表示操作对象,STATE表示目的状态,EVENTTYPE表示事件类型,EVENT表示事件。ApplicableTransition接口定义了apply方法,apply方法负责将TransitionsListNode链表中的节点取出,并放入到最终状态机映射表中。

  private interface ApplicableTransition<OPERAND, STATE extends Enum<STATE>, EVENTTYPE extends Enum<EVENTTYPE>, EVENT> {
void apply(StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> subject);
}

ApplicableTransition接口具体的实现类为ApplicableSingleOrMultipleTransition,它负责储存真正的状态转换方法Transition对象,preState和eventType表示该状态方法对应的准备状态和时间类型。即:在满足preState状态和eventType类型的情况下,使用该Transition处理事件。apply方法则是将上述内容放入状态机映射表中。ApplicableSingleOrMultipleTransition定义如下:

  static private class ApplicableSingleOrMultipleTransition<OPERAND, STATE extends Enum<STATE>, EVENTTYPE extends Enum<EVENTTYPE>, EVENT>
implements ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> {
final STATE preState;
final EVENTTYPE eventType;
final Transition<OPERAND, STATE, EVENTTYPE, EVENT> transition;

ApplicableSingleOrMultipleTransition
(STATE preState, EVENTTYPE eventType,
Transition<OPERAND, STATE, EVENTTYPE, EVENT> transition) {
this.preState = preState;
this.eventType = eventType;
this.transition = transition;
}

@Override
public void apply(StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> subject) {
//获取状态机中preState准备状态对应表
Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>> transitionMap = subject.stateMachineTable.get(preState);
if (transitionMap == null) {
// I use HashMap here because I would expect most EVENTTYPE's to not
// apply out of a particular state, so FSM sizes would be
// quadratic if I use EnumMap's here as I do at the top level.
transitionMap = new HashMap<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>();
subject.stateMachineTable.put(preState, transitionMap);
}
//将对应表中放入事件类型和处理方法
transitionMap.put(eventType, transition);
}
}

Transition接口定义真正执行状态转换的方法:

private interface Transition<OPERAND, STATE extends Enum<STATE>, EVENTTYPE extends Enum<EVENTTYPE>, EVENT> {
STATE doTransition(OPERAND operand, STATE oldState, EVENT event, EVENTTYPE eventType);
}

Transition接口的实现类之一:SingleInternalArc。它表示初始状态在进行状态转换方法后,只有一种结束状态:

  private class SingleInternalArc implements Transition<OPERAND, STATE, EVENTTYPE, EVENT> {

private STATE postState;
private SingleArcTransition<OPERAND, EVENT> hook; // transition hook

SingleInternalArc(STATE postState,
SingleArcTransition<OPERAND, EVENT> hook) {
this.postState = postState;
this.hook = hook;
}

@Override
public STATE doTransition(OPERAND operand, STATE oldState,
EVENT event, EVENTTYPE eventType) {
if (hook != null) {
hook.transition(operand, event);
}
return postState;
}
}

Transition接口的实现类之二:MultipleInternalArc。它表示初始状态在进行状态转换方法后,根据Transition的执行结果返回结束状态:

  private class MultipleInternalArc implements Transition<OPERAND, STATE, EVENTTYPE, EVENT>{

// Fields
private Set<STATE> validPostStates;
private MultipleArcTransition<OPERAND, EVENT, STATE> hook; // transition hook

MultipleInternalArc(Set<STATE> postStates, MultipleArcTransition<OPERAND, EVENT, STATE> hook) {
this.validPostStates = postStates;
this.hook = hook;
}

@Override
public STATE doTransition(OPERAND operand, STATE oldState, EVENT event, EVENTTYPE eventType) throws InvalidStateTransitionException {
STATE postState = hook.transition(operand, event);

if (!validPostStates.contains(postState)) {
throw new InvalidStateTransitionException(oldState, eventType);
}
return postState;
}
}

上述Transition实现类中,分别有SingleArcTransition和MultipleArcTransition,它们内存才是真正封装转换方法的类。在StateMachineFactory#addTranstion时,传入的Transtion对象就是它们的实现类。

SingleArcTransition定义如下:

public interface SingleArcTransition<OPERAND, EVENT> {
//由于结束状态确定,所以返回void
public void transition(OPERAND operand, EVENT event);
}

MultipleArcTransition定义如下:

public interface MultipleArcTransition<OPERAND, EVENT, STATE extends Enum<STATE>> {
//由于结束状态位置,需要根据执行结果确定结束状态
public STATE transition(OPERAND operand, EVENT event);
}

它们的实现类很多,例如:
image.png

5.2 stateMachineTable状态机

stateMachineTable就是状态机,它的类型是两层Map:Map<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>,外层Map的key表示旧状态,内层Map的key表示事件类型,内层Map的value是Transition<OPERAND, STATE, EVENTTYPE, EVENT>接口,OPERAND表示操作对象,STATE表示目的状态,EVENTTYPE表示事件类型,EVENT表示事件。stateMachineTable起到的作用是:RMAppImpl可能有多种旧状态,每种旧状态可以对应多种事件类型,根据旧状态和要处理事件的类型,就能找到处理这种情形的状态转换方法和目的状态,同时状态转换方法包含对事件的处理。

注:第一层Map的Key是STATE状态,它限制处理事件的方法只能在特定初始状态下运行。

5.3 StateMachineFactory构建状态机过程

在了解了StateMachineFactory的成员变量transitionsListNode和stateMachineTable后,就可以深入了解状态机的构建过程。在Handler实现类RMAppImpl中通过StateMachineFactory#addTransition注册状态转换方法。其中,对于SingleArcTransition这种唯一结束状态的实现类使用下面第一种addTransition方法注册;对于MultipleArcTransition这种不确定结束状态的实现类使用下面第二种addTransition方法注册。

5.3.1 注册SingleArcTransition实现类

  public StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> addTransition(STATE preState, STATE postState,
Set<EVENTTYPE> eventTypes, SingleArcTransition<OPERAND, EVENT> hook
)
{
StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> factory = null;
for (EVENTTYPE event : eventTypes) {
if (factory == null) {
factory = addTransition(preState, postState, event, hook);
} else {
factory = factory.addTransition(preState, postState, event, hook);
}
}
return factory;
}

5.3.2 注册MultipleArcTransition实现类

  public StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> addTransition(STATE preState, Set<STATE> postStates, 
EVENTTYPE eventType, MultipleArcTransition<OPERAND, EVENT, STATE> hook
){
return new StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT>(this,
new ApplicableSingleOrMultipleTransition<OPERAND, STATE, EVENTTYPE, EVENT>(preState, eventType, new MultipleInternalArc(postStates, hook)));
}

MultipleArcTransition的构造函数,向旧的StateMachineFactory的transitionsListNode成员中添加前置状态preState和事件类型共同对应的状态转换方法:

  private StateMachineFactory(StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> that, ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> t) {
this.defaultInitialState = that.defaultInitialState;
this.transitionsListNode = new TransitionsListNode(t, that.transitionsListNode);
this.optimized = false;
this.stateMachineTable = null;
}

5.3.3 构建状态机映射表

StateMachineFactory#addTransition方法完成了对TransitionsListNode的构建。而TransitionsListNode只用来暂时存储状态转换方法,StateMachineFactory#installTopology才是真正负责构建状态机映射表的:

  public StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> installTopology() {
return new StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT>(this, true);
}

在StateMachineFactory的构造函数中,如果传递optimized参数为true,表示要通过transitionsListNode构建状态机映射表。采用StateMachineFactory#makeStateMachineTable方法:

  private StateMachineFactory(StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> that, boolean optimized) {
this.defaultInitialState = that.defaultInitialState;
this.transitionsListNode = that.transitionsListNode;
this.optimized = optimized;
if (optimized) {
makeStateMachineTable();
} else {
stateMachineTable = null;
}
}

StateMachineFactory#makeStateMachineTable方法将链表转为栈,弹栈并调用ApplicableTransition接口实现类的apply方法将Transition注册到状态机映射表汇总:

  private void makeStateMachineTable() {
Stack<ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT>> stack =
new Stack<ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT>>();

Map<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>
prototype = new HashMap<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>();

prototype.put(defaultInitialState, null);

// I use EnumMap here because it'll be faster and denser. I would
// expect most of the states to have at least one transition.
stateMachineTable = new EnumMap<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>(prototype);

for (TransitionsListNode cursor = transitionsListNode; cursor != null; cursor = cursor.next) {
stack.push(cursor.transition);
}

while (!stack.isEmpty()) {
stack.pop().apply(this);
}
}

调用ApplicableTransition的实现类ApplicableSingleOrMultipleTransition的apply方法,注册Transition到状态机映射表中:

  static private class ApplicableSingleOrMultipleTransition<OPERAND, STATE extends Enum<STATE>, EVENTTYPE extends Enum<EVENTTYPE>, EVENT>
implements ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> {
final STATE preState;
final EVENTTYPE eventType;
final Transition<OPERAND, STATE, EVENTTYPE, EVENT> transition;

ApplicableSingleOrMultipleTransition(STATE preState, EVENTTYPE eventType, Transition<OPERAND, STATE, EVENTTYPE, EVENT> transition) {
this.preState = preState;
this.eventType = eventType;
this.transition = transition;
}

@Override
public void apply(StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> subject) {
Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>> transitionMap
= subject.stateMachineTable.get(preState);
if (transitionMap == null) {
// I use HashMap here because I would expect most EVENTTYPE's to not
// apply out of a particular state, so FSM sizes would be
// quadratic if I use EnumMap's here as I do at the top level.
transitionMap = new HashMap<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>();
subject.stateMachineTable.put(preState, transitionMap);
}
transitionMap.put(eventType, transition);
}
}

当栈中的所有元素都调用完apply方法后,状态机构建完成。后面就开始访问状态机中的状态转换方法了。

6. 使用状态机(执行状态转换)

对于EventHandler而言,调用handle方法就会调用StateMachine#doTransition方法执行状态转换,例如RMAppImpl这个EventHandler就是如此:

public class RMAppImpl implements RMApp, Recoverable {
public void handle(RMAppEvent event) {

this.writeLock.lock();
try {
ApplicationId appID = event.getApplicationId();
final RMAppState oldState = getState();
//省略
this.stateMachine.doTransition(event.getType(), event);
//省略
} finally {
this.writeLock.unlock();
}
}
}

StateMachineFactory$InternalStateMachine#doTransition负责执行作为中间方法,额外增加listener处理逻辑,有点类似AOP处理。它调用StateMachineFactory#doTransition方法进行状态转换,并返回处理结果的状态:

    public synchronized STATE doTransition(EVENTTYPE eventType, EVENT event) throws InvalidStateTransitionException  {
listener.preTransition(operand, currentState, event);
STATE oldState = currentState;
currentState = StateMachineFactory.this.doTransition(operand, currentState, eventType, event);
listener.postTransition(operand, oldState, currentState, event);
return currentState;
}
}

StateMachineFactory#doTransition根据前置状态和事件类型找到对应的Transition实现类,即SingleInternalArc或者MultipleInternalArc,而它们分别封装了SingleArcTransition和MultipleArcTransition,其实现类就是用户注册的Transition方法。最终执行状态转换:

  private STATE doTransition(OPERAND operand, STATE oldState, EVENTTYPE eventType, EVENT event) throws InvalidStateTransitionException {
Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>> transitionMap = stateMachineTable.get(oldState);
if (transitionMap != null) {
Transition<OPERAND, STATE, EVENTTYPE, EVENT> transition
= transitionMap.get(eventType);
if (transition != null) {
return transition.doTransition(operand, oldState, event, eventType);
}
}
throw new InvalidStateTransitionException(oldState, eventType);
}

最终执行的就是用户注册的自定义的Transition。例如:RMAppRecoveredTransition。它在处理完事件后,返回对应的应用状态:

private static final class RMAppRecoveredTransition implements
MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {

@Override
public RMAppState transition(RMAppImpl app, RMAppEvent event) {

RMAppRecoverEvent recoverEvent = (RMAppRecoverEvent) event;
app.recover(recoverEvent.getRMState());
// The app has completed.
if (app.recoveredFinalState != null) {
app.recoverAppAttempts();
new FinalTransition(app.recoveredFinalState).transition(app, event);
return app.recoveredFinalState;
}

if (UserGroupInformation.isSecurityEnabled()) {
// asynchronously renew delegation token on recovery.
try {
app.rmContext.getDelegationTokenRenewer()
.addApplicationAsyncDuringRecovery(app.getApplicationId(),
BuilderUtils.parseCredentials(app.submissionContext),
app.submissionContext.getCancelTokensWhenComplete(),
app.getUser(),
BuilderUtils.parseTokensConf(app.submissionContext));
} catch (Exception e) {
String msg = "Failed to fetch user credentials from application:" + e
.getMessage();
app.diagnostics.append(msg);
LOG.error(msg, e);
}
}

for (Map.Entry<ApplicationTimeoutType, Long> timeout : app.applicationTimeouts
.entrySet()) {
app.rmContext.getRMAppLifetimeMonitor().registerApp(app.applicationId,
timeout.getKey(), timeout.getValue());
if (LOG.isDebugEnabled()) {
long remainingTime = timeout.getValue() - app.systemClock.getTime();
LOG.debug("Application " + app.applicationId
+ " is registered for timeout monitor, type=" + timeout.getKey()
+ " remaining timeout=" + (remainingTime > 0 ?
remainingTime / 1000 :
0) + " seconds");
}
}

// No existent attempts means the attempt associated with this app was not
// started or started but not yet saved.
if (app.attempts.isEmpty()) {
app.scheduler.handle(
new AppAddedSchedulerEvent(app.user, app.submissionContext, false,
app.applicationPriority, app.placementContext));
return RMAppState.SUBMITTED;
}

// Add application to scheduler synchronously to guarantee scheduler
// knows applications before AM or NM re-registers.
app.scheduler.handle(
new AppAddedSchedulerEvent(app.user, app.submissionContext, true,
app.applicationPriority, app.placementContext));

// recover attempts
app.recoverAppAttempts();

// YARN-1507 is saving the application state after the application is
// accepted. So after YARN-1507, an app is saved meaning it is accepted.
// Thus we return ACCECPTED state on recovery.
return RMAppState.ACCEPTED;
}
}

7. 总结

StateMachineFactory构建了一个初始状态preState->事件类型eventType->状态转换方法Transtition的对应表,合理地组织了多种多样的状态转换方法。这个对应表就是状态机。

举报

相关推荐

0 条评论