
ZooKeeper 提供了分布式数据的发布/订阅功能。一个典型的发布/订阅模型系统定义了一种一对多的订阅关系,能够让多个订阅者同时监听某一个主题对象,当这个主题对象自身状态变化时,会通知所有订阅者,使它们能够做出相应的处理。
ZooKeeper 允许客户端向服务端注册一个 Watcher 监听,当服务端的一些更新操作触发了这个 Watcher,就会向指定客户端发送一个事件通知来实现分布式的通知功能。

本篇仅基于客户端对Watcher注册与通知过程进行讲解,涉及到Watcher如何在服务端注册管理,又如何触发事件远程通知客户端的原理后续在讲解服务端源码时会补充。
不过也不要觉得服务端的Watcher注册与通知逻辑就复杂,其实也比较简单,这里简单陈述以保内容完整:
ZooKeeper的Watcher机制是一个跨进程的发布/订阅功能,客户端与服务端都需要保存数据节点和Watcher的关系,当节点的状态信息变更时就会触发一些事件,服务端先从自己的内存中找出节点对应的Watcher列表,然后一个个遍历生成事件通知消息,再远程发送给客户端;客户端接收到对应消息后,解析出Wather事件信息,得知是哪个数据节点,触发什么事件类型,然后客户端同样从内存中找到节点对应的Watcher列表,真正触发事件回调。
一、基础类

1、Watcher
用户注册watcher都需要实现Watcher接口,实现process方法。
org.apache.zookeeper.Watcher
public interface Watcher {
void process(WatchedEvent event);
}
2、WatchedEvent
process(WatchedEvent event)的参数是WatchedEvent,定义事件信息:
// org.apache.zookeeper.WatchedEvent
public class WatchedEvent {
private final KeeperState keeperState;
private final EventType eventType;
private String path;
... ...
/**
* 将WatchedEvent转换为可以通过网络发送的类型
* Convert WatchedEvent to type that can be sent over network
*/
public WatcherEvent getWrapper() {
return new WatcherEvent(eventType.getIntValue(), keeperState.getIntValue(), path);
}
}
WatchedEvent有3个变量,通知状态keeperState、节点事件类型eventType、节点path:
keeperState和eventType都是Watcher中的枚举类。
(1)KeeperState
| KeeperState | 说明 |
|---|---|
| Disconnected(0) | 客户端与服务端断开连接 |
| SyncConnected(3) | 客户端与服务端处于连接状态 |
| AuthFailed(4) | 授权失败 |
| ConnectedReadOnly(5) | 客户端连接到只读服务器。接收到这个状态后,唯一允许的操作是读取操作。这个状态只在只读客户端产生,读写客户端是不允许连接只读服务器的 |
| SaslAuthenticated(6) | 用于通知客户端他们已经通过了SaslAuthenticated,以后可以用sasl授权的权限执行Zookeeper动作 |
| Expired(-112) | 会话超时 |
| Closed(7) | 客户端已关闭。这个状态永远不会由服务器生成,由客户端本地生成。 |
(2)EventType
| EventType | 说明 |
|---|---|
| None(-1) | KeeperState为SyncConnected(3)时,表示客户端与服务端成功建立会话 |
| NodeCreated(1) | 数据节点创建 |
| NodeDeleted(2) | 数据节点被删除 |
| NodeDataChanged(3) | 数据节点的状态信息更新,即使更新内容一样,版本号,一样会触发 |
| NodeChildrenChanged(4) | 数据节点的孩子节点列表发生变更,特指子节点个数和组成情况的变更,即新增子节点或删除子节点,而子节点内容的变化是不会触发这个事件的 |
| DataWatchRemoved(5) | 数据节点的watcher被主动移除 |
| ChildWatchRemoved(6) | 孩子节点的watcher被主动移除 |
| PersistentWatchRemoved (7) | 持久有效的watcher被主动移除 |
3、WatcherEvent
WatcherEvent是可以通过网络发送的事件信息封装。

WatcherEvent和WatchedEvent表示的是同一个事物,都是对一个watcher事件信息的封装,不同的是,WatchedEvent 是一个逻辑事件,用于服务端和客户端程序执行过程中所需的逻辑对象,而 WatcherEvent 因为实现了序列化接口,因此可以用于网络传输:
serialize(),可以将Watcher信息序列化到网络字节流中,然后发送到网络中。服务端远程通知客户端watcher时使用。deserialize(),可以从网络字节流中反序列化出Watcher信息。客户端接收到服务端远程通知消息时使用。
无论是WatchedEvent还是WatcherEvent,其对watcher事件信息的封装都是极其简单的,客户端无法直接从事件信息中获取对应数据节点的原始数据内容以及变更后的新数据内容,而是需要客户端再次主动去获取数据。
4、WatchRegistration
WatchRegistration是对watcher注册方式的抽象:

注册的动作是一样的,只是需要注册到不同的集合中,具体继承类,需要实现方法WatchRegistration#getWatches,获取相应集合,将Watcher加入节点path对应的列表中。
如下是抽象类WatchRegistration部分代码:
protected abstract Map<String, Set<Watcher>> getWatches(int rc);
public void register(int rc) {
if (rc == KeeperException.Code.OK.intValue()) {
Map<String, Set<Watcher>> watches = getWatches(rc);
synchronized (watches) {
Set<Watcher> watchers = watches.get(clientPath);
if (watchers == null) {
watchers = new HashSet<Watcher>();
watches.put(clientPath, watchers);
}
watchers.add(watcher);
}
}
}
5、WatcherSetEventPair
用户可能会对一个节点注册多个watcher,服务端远程触发客户端的watcher时,客户端需要将该节点对应的所有watcher都触发一次。
所以WatcherSetEventPair对WatchedEvent和watchers列表进行封装,方便EventThread线程处理watcher触发工作。

6、ZKWatchManager
ZKWatchManager作为客户端watcher管理器,实现了接口ClientWatchManager:

ZKWatchManager中用5个集合对应5种不同的watcher注册场景:
dataWatches,在调用getData、getConfig时注册了watcher,会使用dataWatches来存储watcher。existWatches,对应exists。childWatches,对应getChildren。persistentWatches,给定节点持续有效的watcher集合,触发之后不会被移除。persistentRecursiveWatches,给定节点及其递归所有子节点都持续有效的watcher集合,触发之后不会被移除。

之前网上一直说Zookeeper的观察者注册一次只能触发一次,触发的同时会被移除,如果需要注册一次,可多次有效触发,客户端使用起来比较麻烦。
所以官方弥补了这种场景,新加了persistentWatches和persistentRecursiveWatches两种集合来存储持续有效的watcher,触发之后不会被移除,如果要移除需要调用指定方法ZKWatchManager#removeWatcher,如果想注册持续有效的观察者,也是需要单独调用指定方法ZooKeeper#addWatch。
ZKWatchManager实现了接口ClientWatchManager,主要实现了ClientWatchManager#materialize方法,获取一个应该被触发事件的watcher列表:
org.apache.zookeeper.ZKWatchManager#materialize
@Override
public Set<Watcher> materialize(
Watcher.Event.KeeperState state,
Watcher.Event.EventType type,
String clientPath
) {
final Set<Watcher> result = new HashSet<>();
switch (type) {
case None:
// ... ...省略None情况,
// 无类型事件,判断 通知状态KeeperState,如果KeeperState不是SyncConnected 就把所有的 watcher容器都清空
// 根据 EventType 从不同的集合中获取观察者列表
// dataWatches、existWatches、childWatches在获取watcher列表时有移除操作
// persistentWatches、persistentRecursiveWatches没有移除操作
case NodeDataChanged:
case NodeCreated:
synchronized (dataWatches) {
addTo(dataWatches.remove(clientPath), result);
}
synchronized (existWatches) {
addTo(existWatches.remove(clientPath), result);
}
addPersistentWatches(clientPath, result);
break;
case NodeChildrenChanged:
synchronized (childWatches) {
addTo(childWatches.remove(clientPath), result);
}
addPersistentWatches(clientPath, result);
break;
case NodeDeleted:
synchronized (dataWatches) {
addTo(dataWatches.remove(clientPath), result);
}
// TODO This shouldn't be needed, but just in case
synchronized (existWatches) {
Set<Watcher> list = existWatches.remove(clientPath);
if (list != null) {
addTo(list, result);
LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
}
}
synchronized (childWatches) {
addTo(childWatches.remove(clientPath), result);
}
addPersistentWatches(clientPath, result);
break;
default:
String errorMsg = String.format(
"Unhandled watch event type %s with state %s on path %s",
type,
state,
clientPath);
LOG.error(errorMsg);
throw new RuntimeException(errorMsg);
}
return result;
}
private void addPersistentWatches(String clientPath, Set<Watcher> result) {
synchronized (persistentWatches) {
addTo(persistentWatches.get(clientPath), result);
}
synchronized (persistentRecursiveWatches) {
for (String path : PathParentIterator.forAll(clientPath).asIterable()) {
addTo(persistentRecursiveWatches.get(path), result);
}
}
}
由源码可见,从 dataWatches、existWatches、childWatches 集合中获取watcher列表时有移除操作,而从persistentWatches、persistentRecursiveWatches获取时没有移除操作。
二、Watcher注册流程
可以注册watcher的请求都是非事务请求,比如:
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
public byte[] getData(final String path, Watcher watcher, Stat stat)
public Stat exists(final String path, Watcher watcher)
public List<String> getChildren(final String path, Watcher watcher, Stat stat)
... ...
1、构建WatchRegistration
需要注册的Watcher会被封装进一个WatchRegistration对象中,WatchRegistration抽象了注册的方式,会和请求体等一并包装进 Packet。
需要注意,Watcher注册信息不会发送给服务端,而是只发送一个布尔值标注是否注册Watcher(watch=true),这样就减少了数据包的大小,降低了网络压力,同时也使得Watcher注册流程简单。
以getData为例:

2、响应成功后注册Watcher
需要注册Watcher的请求发给服务端后,客户端并不会立刻在自己内存中存储Watcher关系,而是还需要根据请求的响应状态,如果响应状态OK,才会把Watcher注册到ZKWatchManager。

如下图是Wacther注册流程:

三、Watcher通知流程
1、处理事件通知信息
数据节点的状态信息发生变更后,服务端找到该节点的watcher列表,遍历生成事件通知信息发送给客户端。客户端接收到事件通知信息后,反解析出WatcherEvent对象,又转换成WatchedEvent,再提交到EventThread线程处理。
如下是客户端处理事件通知信息NOTIFICATION的部分源码:

2、提交给EventThread线程
从事件通知信息中解析出WatchedEvent后,通过WatchedEvent的三个属性keeperState、eventType和 path从ZKWatchManager中取出符合要求的Watcher列表,然后将WachedEvent对象和Watcher列表封装进 WatcherSetEventPair并添加到waitingEvents队列。

3、遍历waitingEvents队列

4、真正触发Watcher#process

如下图是Watcher通知流程:

四、总结
1、Watcher注册时,客户端只发送了一个布尔值给服务端声明是否需要注册Watcher;只有当服务端那边Wacther注册成功了,且响应成功,客户端这边才会保存Watcher和节点的关系。
2、Wacther通知时,只能从通知信息中得知是哪个节点发生什么事件,而无法得知具体发生了什么变更,要想得知必须再主动获取一次节点信息。
推荐阅读:《从Paxos到Zookeeper:分布式一致性原理与实践》倪超著。
如若文章有错误理解,欢迎批评指正,同时非常期待你的评论、点赞和收藏。










