第11章 zookeeper应用程序接口
Zookeeper有一个官方的API绑定的Java和C。 ZooKeeper Community提供大多数语言(.NET,Python等)的API。使用ZooKeeper的API,应用程序可以连接、交互、操纵数据、协调,最终从ZooKeeper ensemble断开。
ZooKeeper API具有丰富的功能集来获得的ZooKeeper ensemble的所有功能用一个简单而安全的方式。ZooKeeper API提供同步和异步方法。
ZooKeeper ensemble和ZooKeeper API在各方面完全相得益彰,它有利于开发者用一个很好的方式。
11.1 ZooKeeper API基础
与ZooKeeper ensemble交互的应用被称为ZooKeeper Client或着客户端(Client)。
Znode是ZooKeeper ensemble核心组件,ZooKeeper API提供了一套方法来操作ZooKeeper ensemble中znode的详细信息。
客户端(Client)应遵循下面给出与ZooKeeper ensemble进行清晰整洁互动的步骤。
Ø 连接到ZooKeeper ensemble。ZooKeeper ensemble为客户端分配一个会话的ID。
Ø 定期发送心跳到服务器。否则,ZooKeeper ensemble中到期的会话ID和客户端需要重新连接。
Ø Get/Set znodes只要会话ID是有效的。
Ø 一旦所有的任务都完成, ZooKeeperensemble断开。如果客户端长时间处于非活动状态,那么ZooKeeper ensemble会自动断开客户端。
11.2 Java 绑定
ZooKeeper API的核心部分是ZooKeeperclass。提供连接ZooKeeper ensemble构造的选项,连接方法如下:
Ø connect---连接到ZooKeeper ensemble
Ø create-------创建一个znode
Ø exists--------检查znode是否存在以及节点信息
Ø getData-----从指定节点获取数据
Ø setData-----从指定节点设置数据
Ø getChildren--在指定znode中获取所有可用的sub-nodes
Ø delete----------删除指定znode及所有children
Ø close-----------关闭连接
11.3 连接zookeeper ensemble
zookeeperclass通过它的constructor提供连接的函数。constructor特征如下:
ZooKeeper(String connectionString, int sessionTimeout, Watcher watcher)
参数解释说明:
connectionString----zookeeper ensemble host(服务)
sessionTimeout---会话超时单位用毫秒
watcher----------对象实施“Watcher”的接口。zookeeper ensemble返回连接的status通过watcher对象。
创建一个新的helper类:ZooKeeperConnection并添加一个方法:connect。connect方法创建一个ZooKeeper对象、连接到ZooKeeperensemble、然后返回该对象。
CountDownLatch被用于停止(等待)的主要过程,直到客户端与ZooKeeper ensemble连接到。
ZooKeeperensemble通过Watcher callback返回连接状态。一旦客户端与ZooKeeper ensemble连接,Watcher callback被调用;Watcher callback调用CountDownLatch中CountDownLatch方法释放主进程中的lock、await。
下面是与ZooKeeper ensemble连接的完整代码: ZooKeeperConnection.java
// import java classes
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
// import zookeeper classes
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.data.Stat;
public class ZooKeeperConnection {
// declare zookeeper instance to access ZooKeeper ensemble
private ZooKeeper zoo;
final CountDownLatch connectedSignal = new CountDownLatch(1);
// Method to connect zookeeper ensemble.
public ZooKeeper connect(String host) throws IOException,InterruptedException {
zoo = new ZooKeeper(host,5000,new Watcher() {
public void process(WatchedEvent we) {
if (we.getState() == KeeperState.SyncConnected) {
connectedSignal.countDown();
}
}
});
connectedSignal.await();
return zoo;
}
// Method to disconnect from zookeeper server
public void close() throws InterruptedException {
zoo.close();
}
}
11.4 创建Znode
zookeeper类提供create方法来创建ZooKeeper ensemble中新的znode。create方法的参数如下:
create(String path, byte[] data, List<ACL> acl, CreateMode createMode)
参数解释说明:
path----------Znode Path。例如,/myapp1,/myapp1/mydata1
data----------在指定znode path存储data
acl-----------被创建node的接入控制列表(ACL)。zookeeper API为了解一些基本的ACL列表提供一个静态接口:ZooDefs.Ids。例如,ZooDefs.Ids.OPEN_ACL_UNSAFE返回打开节点的ACL列表
createMode-----node的类型,要么ephemeral、sequential、both。是枚举类型。
创建一个新的Java应用程序来检查ZooKeeperAPI的create功能。创建一个文件ZKCreate.java。在main方法中,创建一个类型ZooKeeperConnection的对象,并调用connect方法来连接的ZooKeeper ensemble。
connect方法将返回ZooKeeper对象ZK。现在,ZK调用对象的自定义路径(path)和数据(data)创建方法。完整的程序代码来创建znode如下:ZKCreate.java
import java.io.IOException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
public class ZKCreate {
// create static instance for zookeeper class.
private static ZooKeeper zk;
// create static instance for ZooKeeperConnection class.
private static ZooKeeperConnection conn;
// Method to create znode in zookeeper ensemble
public static void create(String path, byte[] data) throws KeeperException,InterruptedException {
zk.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
public static void main(String[] args)
{
// znode path
String path = "/MyFirstZnode"; // Assign path to znode
// data in byte array
byte[] data = "My first zookeeper app”.getBytes(); // Declare data
try {
conn = new ZooKeeperConnection();
zk = conn.connect("localhost");
create(path, data); // Create the data to the specified path
conn.close();
} catch (Exception e) {
System.out.println(e.getMessage()); //Catch error message
}
}
}
一旦应用程序编译并执行,带有指定数据的znode被创建在zookeeperensemble。用zookeeper CLI zkCli.sh检查它:
启动客服端:bin/zkCli.shstart
调用get:get /MyFirstZnode
11.5Exists---检查znode是否存在
zookeeper类提供exists方法来检查znode的存在。如果指定的znode存在,它返回一个znode的元数据。该exists方法如下:
exists(String path, boolean watcher)
参数解释说明:
path-----znode 路径
watcher----布尔值,给指定znode设置watch
创建一个新的Java应用程序来检查zookeeperAPI 的行exists函数。创建一个文件ZKExists.java。在main方法中,创建ZooKeeper的对象, ZK使用ZooKeeperConnection对象。于是,使用带有自定义的path的zk对象调用exist函数。完整的列表如下:ZKExists.java
import java.io.IOException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.data.Stat;
public class ZKExists {
private static ZooKeeper zk;
private static ZooKeeperConnection conn;
// Method to check existence of znode and its status, if znode is available.
public static Stat znode_exists(String path) throws KeeperException,InterruptedException {
return zk.exists(path, true);
}
public static void main(String[] args) throws InterruptedException,KeeperException {
String path= "/MyFirstZnode"; // Assign znode to the specified path
try {
conn = new ZooKeeperConnection();
zk = conn.connect("localhost");
Stat stat = znode_exists(path); // Stat checks the path of the znode
if(stat!= null) {
System.out.println("Node exists and the node version is " + stat.getVersion());
} else {
System.out.println("Node does not exists");
}
}
catch(Exception e) {
System.out.println(e.getMessage()); // Catches error messages
}
}
}
一旦应用程序被编译、执行,输出信息:Nodeexists and the node version is 1.
11.6 getData方法
zookeeper类提供的getData方法来获取连接在指定znode的数据及其status。GetData方法:
getData(String path, Watcher watcher, Stat stat)
参数解释说明:
path-------Znode 路径
watcher----Watcher类型的回调函数。zookeeper ensemble将通过回调watcher通知,当指点节点数据变化时。一次通知
stat--------返回znode的元数据
创建一个新的Java应用程序帮助理解ZooKeeperAPI中getData函数。创建一个文件ZKGetData.java。在main方法中,使用ZooKeeperConnection对象创建一个ZooKeeper对象ZK。然后,ZK对象调用自定义路径GetData方法。
下面是完整的程序代码,以获取从指定的节点的数据:ZKGetData.java
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.data.Stat;
public class ZKGetData {
private static ZooKeeper zk;
private static ZooKeeperConnection conn;
public static Stat znode_exists(String path) throws KeeperException,InterruptedException {
return zk.exists(path,true);
}
public static void main(String[] args) throws InterruptedException, KeeperException {
String path = "/MyFirstZnode";
final CountDownLatch connectedSignal = new CountDownLatch(1);
try {
conn = new ZooKeeperConnection();
zk = conn.connect("localhost");
Stat stat = znode_exists(path);
if(stat != null) {
byte[] b = zk.getData(path, new Watcher() {
public void process(WatchedEvent we) {
if (we.getType() == Event.EventType.None) {
switch(we.getState()) {
case Expired:
connectedSignal.countDown();
break;
}
} else {
String path = "/MyFirstZnode";
try {
byte[] bn = zk.getData(path, false, null);
String data = new String(bn, "UTF-8");
System.out.println(data);
connectedSignal.countDown();
} catch(Exception ex) {
System.out.println(ex.getMessage());
}
}
}
}, null);
String data = new String(b, "UTF-8");
System.out.println(data);
connectedSignal.await();
} else {
System.out.println("Node does not exists");
}
}
catch(Exception e) {
System.out.println(e.getMessage());
}
}
}
一旦编译并运行,输出:My first zookeeper app。用zookeeper CLIzkCli.sh检查它:
启动客服端:bin/zkCli.shstart
调用set:set /MyFirstZnode Hello
应用程序输出"Hello"并退出。