1. 前言
通过第一篇文章Yarn Service设计思想知道,YARN为了处理繁多的功能,将每个功能封装成一个Service,通过实现CompositeService统一管理子服务的初始化/启动/停止流程。ResourceManager作为CompositeService的实现类,也遵循这套流程。更重要的是,在启动ResourceManager时,还会通过Zookeeper选举出Active ResourceManager。本篇文章以ResourceManager服务启动为基础,深入探究Active ResourceManager选举框架。
2. ResourceManager启动哪些服务
对于ResourceManager提供的服务,分为两种:常驻服务和"活动"服务active service:
- 常驻服务在两台ResourceManager上都提供。具体提供的服务如下:
- Active Service只能在一台Active ResourceManager上提供。具体提供的服务如下:
对于ActiveServices的初始化/启动/管理,这里先写结论,具体的分析在后面:
ActiveServices并没有作为ResourceManager的子服务统一进行管理,ResourceManager服务在初始化/停止时,单独增加逻辑对Active Serivces进行初始化/停止;Active Serivces启动流程由ResourceManager的子服务EmbeddedElector管理。如下图所示:
3. 为什么不将ActiveServices服务作为ResourceManager子服务
由于ActiveServices只能在一台ResourceManager上启动,因此ActiveServices的start启动逻辑与ResourceManager的启动逻辑不同:ResourceManager无差别启动所有子服务,此时由于还不确定哪一台ResourceManager作为Active ResourceManager,因此此时如果ActiveServices加入ResourceManager子服务,那么两台ResourceManager上都会启动ActiveServices,这显然不符合ActiveServices服务的唯一性。
4. 何时启动ActiveServices服务
在两台ResourceManager中,为了选举出Active ResourceManager,从而提供唯一的ActiveServices服务。ResourceManager提供了常驻子服务EmbeddedElector。EmbeddedElector内部连接zookeeper,当启动EmbeddedElector时,互斥地争抢当前ResourceManager对应的锁,抢到锁后,当前ResourceManager状态切换成Active ResourceManager,并启动ActiveServices服务;否则当前ResourceManager状态切换成Standby ResourceManager,只维护常驻服务。同时,EmbeddedElector服务还向zookeeper注册watcher,一旦Active ResourceManager状态发生变化,watcher的回调函数会立即切换ResourceManager状态。
5. ResourceManager子服务初始化&启动流程
在针对Active ResourceManager启动方面,其重要的子服务初始化/启动流程如下:
- EmbeddedElector选举服务初始化
- ActiveServices初始化
- EmbeddedElector选举服务启动
- ActiveServices启动
对应的代码如下:
public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean {
public static void main(String argv[]) {
//省略
ResourceManager resourceManager = new ResourceManager();
// 这里就是调用AbstractService.init,然后调用ResourceManager.serviceInit
resourceManager.init(conf);
// 和上面类似,调用ResourceManager.serviceStart
resourceManager.start();
//省略
}
5.1 ResourceManager初始化流程
ResourceManager#init初始化方法是继承自AbstractService#init方法,最终调用ResourceManager#serviceInit:
public abstract class AbstractService implements Service {
public void init(Configuration conf) {
if (conf == null) {
throw new ServiceStateException("Cannot initialize service "
+ getName() + ": null configuration");
}
if (isInState(STATE.INITED)) {
return;
}
synchronized (stateChangeLock) {
if (enterState(STATE.INITED) != STATE.INITED) {
setConfig(conf);
try {
serviceInit(config);
if (isInState(STATE.INITED)) {
//if the service ended up here during init,
//notify the listeners
notifyListeners();
}
} catch (Exception e) {
noteFailure(e);
ServiceOperations.stopQuietly(LOG, this);
throw ServiceStateException.convert(e);
}
}
}
}
}
注意:只有AbstractService实现了init方法,它负责调用子类(例如:ResourceManager) 的serviceInit方法,如果子类没有实现serviceInit方法,就调用AbstractService默认的serviceInit方法。CompositeService只实现serviceInit方法,它负责初始化所有子服务,serviceInit内部还是调用AbstractService#init方法,调用init方法流程同上。start和stop方法同理。
ResourceManager#serviceInit负责创建EmbeddedElector服务作为子服务,并调用ResourceManager#createAndInitActiveServices方法创建并初始化独立的ActiveServices服务:
public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean {
protected void serviceInit(Configuration conf) throws Exception {
//省略
//注册常驻服务,例如AdminService
adminService = createAdminService();
addService(adminService);
rmContext.setRMAdminService(adminService);
//省略
if (this.rmContext.isHAEnabled()) {
// If the RM is configured to use an embedded leader elector,
// initialize the leader elector.
if (HAUtil.isAutomaticFailoverEnabled(conf)
&& HAUtil.isAutomaticFailoverEmbedded(conf)) {
EmbeddedElector elector = createEmbeddedElector();
addIfService(elector);
rmContext.setLeaderElectorService(elector);
}
}
//省略
createAndInitActiveServices(false);
//省略
}
}
5.2 创建选举服务EmbeddedElector
ResourceManager根据yarn.resourcemanager.ha.curator-leader-elector.enabled配置确定EmbeddedElector的具体实现类。如果为true,就确定具体实现类为CuratorBasedElectorService,该实现类基于curator框架,curator框架是zk客户端框架,它在zookeeper原生API接口上进行了包装。默认的实现类为ActiveStandbyElectorBasedElectorService,它基于原生zookeeper API接口:
public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean {
protected EmbeddedElector createEmbeddedElector() throws IOException {
EmbeddedElector elector;
curatorEnabled =
conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
if (curatorEnabled) {
this.zkManager = createAndStartZKManager(conf);
elector = new CuratorBasedElectorService(this);
} else {
elector = new ActiveStandbyElectorBasedElectorService(this);
}
return elector;
}
}
5.3 ActiveStandbyElectorBasedElectorService选举服务初始化
本文章基于ActiveStandbyElectorBasedElectorService选举服务讨论Active ResourceManager选举流程。ActiveStandbyElectorBasedElectorService的初始化方法中,定义了zookeeper路径/yarn-leader-election/ActiveStandbyElectorLock,每台ResourceManager的ElectorService都会尝试在zookeeper中创建该临时路径。一旦路径创建成功,该ResourceManager最终会被选举成为Active ResourceManager。
最重要的是,ActiveStandbyElectorBasedElectorService初始化时,创建成员变量ActiveStandbyElector实例:
public class ActiveStandbyElectorBasedElectorService extends AbstractService implements EmbeddedElector, ActiveStandbyElector.ActiveStandbyElectorCallback {
protected void serviceInit(Configuration conf) throws Exception {
//省略
String zkQuorum = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
//省略
String rmId = HAUtil.getRMHAId(conf);
String clusterId = YarnConfiguration.getClusterId(conf);
localActiveNodeInfo = createActiveNodeInfo(clusterId, rmId);
String zkBasePath = conf.get(YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,
YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
String electionZNode = zkBasePath + "/" + clusterId;
zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS,
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
List<ACL> zkAcls = ZKCuratorManager.getZKAcls(conf);
List<ZKUtil.ZKAuthInfo> zkAuths = ZKCuratorManager.getZKAuths(conf);
//省略
//创建选举对象
elector = new ActiveStandbyElector(zkQuorum, (int) zkSessionTimeout,
electionZNode, zkAcls, zkAuths, this, maxRetryNum, false);
elector.ensureParentZNode();
//省略
super.serviceInit(conf);
}
}
ActiveStandbyElector负责连接Zookeeper服务端,维持watcher,监听互斥锁/yarn-leader-election/ActiveStandbyElectorLock的状态,根据其状态切换ResourceManager的状态。
5.3.1 zookeeper连接
在ActiveStandbyElector初始化时,会创建与zookeeper的连接:
public ActiveStandbyElector(String zookeeperHostPorts, int zookeeperSessionTimeout, String parentZnodeName, List<ACL> acl, List<ZKAuthInfo> authInfo, ActiveStandbyElectorCallback app, int maxRetryNum, boolean failFast) throws IOException, HadoopIllegalArgumentException, KeeperException {
if (app == null || acl == null || parentZnodeName == null
|| zookeeperHostPorts == null || zookeeperSessionTimeout <= 0) {
throw new HadoopIllegalArgumentException("Invalid argument");
}
zkHostPort = zookeeperHostPorts;
zkSessionTimeout = zookeeperSessionTimeout;
zkAcl = acl;
zkAuthInfo = authInfo;
appClient = app;
znodeWorkingDir = parentZnodeName;
zkLockFilePath = znodeWorkingDir + "/" + LOCK_FILENAME;
zkBreadCrumbPath = znodeWorkingDir + "/" + BREADCRUMB_FILENAME;
this.maxRetryNum = maxRetryNum;
// establish the ZK Connection for future API calls
if (failFast) {
createConnection();
} else {
reEstablishSession();
}
}
reEstablishSession在createConnection方法外,包装了一层错误重试。这里直接看ActiveStandbyElector#createConnection方法:
private void createConnection() throws IOException, KeeperException {
//省略
zkClient = connectToZooKeeper();
//省略
}
注意:ActiveStandbyElectorBasedElectorService在构建ActiveStandbyElector时,将自身实例传给ActiveStandbyElector构造函数的ActiveStandbyElectorCallback类型参数,最终赋值给ActiveStandbyElector的appClient成员。ActiveStandbyElector的appClient非常重要,文章后面会提到。
ActiveStandbyElector#connectToZooKeeper负责创建Watcher对象,对zookeeper进行监听:
protected synchronized ZooKeeper connectToZooKeeper() throws IOException, KeeperException {
watcher = new WatcherWithClientRef();
//把watcher注册到zookeeper中
ZooKeeper zk = createZooKeeper();
watcher.setZooKeeperRef(zk);
//省略
watcher.waitForZKConnectionEvent(zkSessionTimeout);
//省略
return zk;
}
WatcherWithClientRef#process方法负责处理zk事件,真实处理事件的是ActiveStandbyElector#processWatchEvent方法:
private final class WatcherWithClientRef implements Watcher {
private ZooKeeper zk;
//只有收到zk服务端的返回的连接事件后,才允许处理其它事件
private CountDownLatch hasReceivedEvent = new CountDownLatch(1);
//只有等待watcher设置了zookeeper引用,才能处理事件
private CountDownLatch hasSetZooKeeper = new CountDownLatch(1);
//省略普通方法
//process是watcher处理zk事件的方法
@Override
public void process(WatchedEvent event) {
//省略
ActiveStandbyElector.this.processWatchEvent(zk, event);
//省略
}
}
5.3.2 zookeeper监听处理
ActiveStandbyElector#processWatchEvent负责处理监听事件,zk状态和事件类型对应关系如下:
根据zk状态和事件类型的不同,对ResourceManager状态的调整策略也不同。具体处理逻辑如下所示:
public class ActiveStandbyElector implements StatCallback, StringCallback {
private ConnectionState zkConnectionState = ConnectionState.TERMINATED;
synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) {
Event.EventType eventType = event.getType();
//处理连接状态下的事件
if (eventType == Event.EventType.None) {
// the connection state has changed
switch (event.getState()) {
case SyncConnected:
LOG.info("Session connected.");
ConnectionState prevConnectionState = zkConnectionState;
zkConnectionState = ConnectionState.CONNECTED;
if (prevConnectionState == ConnectionState.DISCONNECTED &&
wantToBeInElection) {
monitorActiveStatus();
}
break;
case Disconnected:
LOG.info("Session disconnected. Entering neutral mode...");
// ask the app to move to safe state because zookeeper connection
// is not active and we dont know our state
zkConnectionState = ConnectionState.DISCONNECTED;
enterNeutralMode();
break;
case Expired:
// the connection got terminated because of session timeout
// call listener to reconnect
LOG.info("Session expired. Entering neutral mode and rejoining...");
enterNeutralMode();
reJoinElection(0);
break;
case SaslAuthenticated:
LOG.info("Successfully authenticated to ZooKeeper using SASL.");
break;
default:
fatalError("Unexpected Zookeeper watch event state: "
+ event.getState());
break;
}
return;
}
//监听节点发生修改
String path = event.getPath();
if (path != null) {
switch (eventType) {
case NodeDeleted:
if (state == State.ACTIVE) {
enterNeutralMode();
}
joinElectionInternal();
break;
case NodeDataChanged:
monitorActiveStatus();
break;
default:
if (LOG.isDebugEnabled()) {
LOG.debug("Unexpected node event: " + eventType + " for path: " + path);
}
monitorActiveStatus();
}
return;
}
// some unexpected error has occurred
fatalError("Unexpected watch error from Zookeeper");
}
对于ActiveStandbyElector处理zk事件的方法,无非就是ResourceManager进入active状态/standby状态/neutral状态。这里讨论一下它们的转换逻辑。
5.3.2.1 竞争active状态
检查是否存在节点,不存在就进入standby状态,并重新注册watcher:
public class ActiveStandbyElector implements StatCallback, StringCallback {
private void monitorLockNodeAsync() {
monitorLockNodePending = true;
monitorLockNodeClient = zkClient;
zkClient.exists(zkLockFilePath, watcher, this, zkClient);
}
}
ActiveStandbyElector重写了exists回调函数,会根据分布式锁的获取情况转换ResourceManager的主备状态:
public class ActiveStandbyElector implements StatCallback, StringCallback {
public synchronized void processResult(int rc, String path, Object ctx, Stat stat) {
//如果当前ResourceManager获取到了zk分布式锁,就进入activce状态,否则就进入standby状态
if (stat.getEphemeralOwner() == zkClient.getSessionId()) {
//进入active状态
if (!becomeActive()) {
reJoinElectionAfterFailureToBecomeActive();
}
} else {
//进入standby状态
becomeStandby();
}
}
//节点不存在就进入中立状态,并尝试创建zk分布式锁
if (isNodeDoesNotExist(code)) {
enterNeutralMode();
//尝试重新创建zk分布式锁
joinElectionInternal();
return;
}
}
}
5.3.2.2 中立状态处理
如果断连,就进入NEUTRAL状态:
public class ActiveStandbyElector implements StatCallback, StringCallback {
private void enterNeutralMode() {
if (state != State.NEUTRAL) {
state = State.NEUTRAL;
appClient.enterNeutralMode();
}
}
}
ActiveStandbyElector#enterNeutralMode调用appClient成员的enterNeutralMode方法。而appClient的实例类型其实就是ActiveStandbyElectorBasedElectorService,即调用ActiveStandbyElectorBasedElectorService#enterNeutralMode进入中立状态。中立状态下ResourceManager丢失与ZK的连接,尝试先进入standby状态:
public class ActiveStandbyElectorBasedElectorService extends AbstractService implements EmbeddedElector, ActiveStandbyElector.ActiveStandbyElectorCallback {
public void enterNeutralMode() {
//省略
zkDisconnectTimer = new Timer("Zookeeper disconnect timer");
zkDisconnectTimer.schedule(new TimerTask() {
@Override
public void run() {
synchronized (zkDisconnectLock) {
becomeStandby();
}
}
}, zkSessionTimeout);
}
//省略
}
}
5.3.2.3 连接过期处理
如果过期,就重新尝试进入Active状态:
public class ActiveStandbyElector implements StatCallback, StringCallback {
private void reJoinElection(int sleepTime) {
sessionReestablishLockForTests.lock();
try {
terminateConnection();
sleepFor(sleepTime);
if (appData != null) {
joinElectionInternal();
} finally {
sessionReestablishLockForTests.unlock();
}
}
}
对于初次链接zookeeper场景。初始状态是ConnectionState.TERMINATED,当客户端与zookeeper服务端成功创建会话时,客户端收到zookeeper服务端返回的状态是SyncConnected,其对应的事件类型是Event.EventType.None。按照zookeeper事件处理方法processWatchEvent,此时直接break跳出switch分支。这表示,当客户端成功与服务端建立连接,客户端不需要进行任何处理。
5.4 Active Service初始化
在ResourceManager初始化时,会额外调用方法初始化ActiveServices。Active Service不属于ResourceManager的子服务,即ResourceManager的初始化/启动/停止流程与Active Service无关:
public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean {
protected void serviceInit(Configuration conf) throws Exception {
//省略
createAndInitActiveServices(false);
//省略
}
}
ResourceManager#createAndInitActiveServices调用activeServices的初始化逻辑:
public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean {
protected void createAndInitActiveServices(boolean fromActive) {
activeServices = new RMActiveServices(this);
activeServices.fromActive = fromActive;
activeServices.init(conf);
}
}
activeServices具体类型为RMActiveServices,其初始化过程就是创建子服务,并添加子服务:
public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean {
public class RMActiveServices extends CompositeService {
protected void serviceInit(Configuration configuration) throws Exception {
standByTransitionRunnable = new StandByTransitionRunnable();
//忽略
xxxService = createXxxService();
addService(xxxService);
//忽略
super.serviceInit(conf);
}
}
}
5.5 选举服务启动
创建ActiveStandbyElectorBasedElectorService对象后,所有Resourcemanager都创建了zkClient,与zkServer创建连接。启动EmbeddedElector的调用流如下:
ActiveStandbyElectorBasedElectorService#serviceStart -> ActiveStandbyElector#joinElection -> ActiveStandbyElector#joinElectionInternal -> ActiveStandbyElector#createLockNodeAsync。ActiveStandbyElector#createLockNodeAsync负责获取获取active ResourceManager的锁:
public class ActiveStandbyElector implements StatCallback, StringCallback {
private void createLockNodeAsync() {
zkClient.create(zkLockFilePath, appData, zkAcl, CreateMode.EPHEMERAL, this, zkClient);
}
}
createLockNodeAsync调用Zookeeper#create尝试获取分布式锁,以进入Active状态:
public class ZooKeeper {
public void create(final String path, byte data[], List<ACL> acl, CreateMode createMode, StringCallback cb, Object ctx)
{
//省略
}
}
StringCallback是异步回调,表示当客户端向服务端发送创建节点的请求时,服务端异步返回响应消息给客户端后,客户端通过StringCallback#processResult处理该响应。
注意:Zookeeper 客户端中Watcher和AsyncCallback都是异步回调的方式,但它们回调的时机是不一样的,前者是由服务器发送事件触发客户端回调,后者是在执行了请求后得到响应后客户端主动触发的。
对于上述create方法,ActiveStandbyElector实现了Zookeeper提供的回调接口。当create方法执行完,异步执行ActiveStandbyElector#processResult方法:
public class ActiveStandbyElector implements StatCallback, StringCallback {
public synchronized void processResult(int rc, String path, Object ctx,
String name) {
//省略
if (isSuccess(code)) {
// we successfully created the znode. we are the leader. start monitoring
//尝试进入Active状态
if (becomeActive()) {
//验证
monitorActiveStatus();
} else {
//否则重新尝试创建zookeeper节点,以获得Active状态
reJoinElectionAfterFailureToBecomeActive();
}
return;
}
//如果创建节点失败,但是节点已经存在,就进入standby状态
if (isNodeExists(code)) {
if (createRetryCount == 0) {
becomeStandby();
}
monitorActiveStatus();
return;
}
//如果创建节点失败,节点尚未存在,就重试
if (shouldRetry(code)) {
if (createRetryCount < maxRetryNum) {
++createRetryCount;
createLockNodeAsync();
return;
}
//省略
}
}
5.6 Active Service 启动
正常情况下,调用ActiveStandbyElector#becomeActive方法使ResourceManager进入active状态:
public class ActiveStandbyElector implements StatCallback, StringCallback {
private boolean becomeActive() {
//省略
appClient.becomeActive();
//省略
}
}
appClient正是初始化ActiveStandbyElector对象时传入的ActiveStandbyElectorBasedElectorService实例:
public class ActiveStandbyElectorBasedElectorService extends AbstractService
implements EmbeddedElector,
ActiveStandbyElector.ActiveStandbyElectorCallback {
public void becomeActive() throws ServiceFailedException {
cancelDisconnectTimer();
try {
rm.getRMContext().getRMAdminService().transitionToActive(req);
} catch (Exception e) {
throw new ServiceFailedException("RM could not transition to Active", e);
}
}
调用AdminService#transitionToActive使当前ResourceManager进入Active状态:
public class AdminService extends CompositeService implements HAServiceProtocol, ResourceManagerAdministrationProtocol {
public synchronized void transitionToActive(
HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException {
//省略
rm.transitionToActive();
//省略
}
}
AdminService内部调用ResourceManager#startActiveServices方法使ResourceManager进入active状态:
public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean {
synchronized void transitionToActive() throws Exception {
if (rmContext.getHAServiceState() == HAServiceProtocol.HAServiceState.ACTIVE) {
LOG.info("Already in active state");
return;
}
LOG.info("Transitioning to active state");
this.rmLoginUGI.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
try {
startActiveServices();
return null;
} catch (Exception e) {
reinitialize(true);
throw e;
}
}
});
rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.ACTIVE);
LOG.info("Transitioned to active state");
}
}
ResourceManager#startActiveServices真正启动active services服务:
public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean {
void startActiveServices() throws Exception {
if (activeServices != null) {
clusterTimeStamp = System.currentTimeMillis();
activeServices.start();
}
}
}
5.7 切换standby状态
当选举失败时,ResourceManager会进入standby状态;如果此时ResourceManager已经处于active状态,会停止RMActiveServices服务:
public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean {
synchronized void transitionToStandby(boolean initialize) throws Exception {
if (rmContext.getHAServiceState() == HAServiceProtocol.HAServiceState.STANDBY) {
LOG.info("Already in standby state");
return;
}
LOG.info("Transitioning to standby state");
HAServiceState state = rmContext.getHAServiceState();
rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.STANDBY);
if (state == HAServiceProtocol.HAServiceState.ACTIVE) {
stopActiveServices();
reinitialize(initialize);
}
LOG.info("Transitioned to standby state");
}
}
6. 总结
RMActiveServices不属于ResourceManager的子服务,初始化/启动/停止流程都独立于ResourceManager子服务流程:
- 启动流程由ActiveStandbyElectorBasedElectorService选举服务负责。
- 状态切换流程由Zookeeper监听器服务实现。
- 初始化/停止流程由ResourceManager的额外的方法调用实现。