0
点赞
收藏
分享

微信扫一扫

大数据学习笔记-------------------(12_1)

第11章 zookeeper应用程序接口

Zookeeper有一个官方的API绑定的Java和C。 ZooKeeper Community提供大多数语言(.NET,Python等)的API。使用ZooKeeper的API,应用程序可以连接、交互、操纵数据、协调,最终从ZooKeeper ensemble断开。

ZooKeeper API具有丰富的功能集来获得的ZooKeeper ensemble的所有功能用一个简单而安全的方式。ZooKeeper API提供同步和异步方法。

ZooKeeper ensembleZooKeeper API在各方面完全相得益彰,它有利于开发者用一个很好的方式。

11.1 ZooKeeper API基础

ZooKeeper ensemble交互的应用被称为ZooKeeper Client或着客户端(Client)。

 Znode是ZooKeeper ensemble核心组件,ZooKeeper API提供了一套方法来操作ZooKeeper ensemble中znode的详细信息。

 客户端(Client)应遵循下面给出与ZooKeeper ensemble进行清晰整洁互动的步骤。

Ø  连接到ZooKeeper ensembleZooKeeper ensemble为客户端分配一个会话的ID。

Ø  定期发送心跳到服务器。否则,ZooKeeper ensemble中到期的会话ID和客户端需要重新连接。

Ø  Get/Set znodes只要会话ID是有效的。

Ø  一旦所有的任务都完成, ZooKeeperensemble断开。如果客户端长时间处于非活动状态,那么ZooKeeper ensemble会自动断开客户端。

11.2 Java 绑定

ZooKeeper API的核心部分是ZooKeeperclass。提供连接ZooKeeper ensemble构造的选项,连接方法如下:

Ø  connect---连接到ZooKeeper ensemble

Ø  create-------创建一个znode

Ø  exists--------检查znode是否存在以及节点信息

Ø  getData-----从指定节点获取数据

Ø  setData-----从指定节点设置数据

Ø  getChildren--在指定znode中获取所有可用的sub-nodes

Ø  delete----------删除指定znode及所有children

Ø  close-----------关闭连接

11.3 连接zookeeper ensemble

zookeeperclass通过它的constructor提供连接的函数。constructor特征如下:  

ZooKeeper(String connectionString, int sessionTimeout, Watcher watcher)


 参数解释说明:

connectionString----zookeeper ensemble host(服务)

sessionTimeout---会话超时单位用毫秒

watcher----------对象实施“Watcher”的接口。zookeeper ensemble返回连接的status通过watcher对象。

 创建一个新的helper类:ZooKeeperConnection并添加一个方法:connect。connect方法创建一个ZooKeeper对象、连接到ZooKeeperensemble、然后返回该对象。

CountDownLatch被用于停止(等待)的主要过程,直到客户端与ZooKeeper ensemble连接到。

ZooKeeperensemble通过Watcher callback返回连接状态。一旦客户端与ZooKeeper ensemble连接,Watcher callback被调用;Watcher callback调用CountDownLatch中CountDownLatch方法释放主进程中的lock、await

下面是与ZooKeeper ensemble连接的完整代码: ZooKeeperConnection.java 



// import java classes
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
// import zookeeper classes
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.data.Stat;
public class ZooKeeperConnection {
// declare zookeeper instance to access ZooKeeper ensemble
private ZooKeeper zoo;
final CountDownLatch connectedSignal = new CountDownLatch(1);
// Method to connect zookeeper ensemble.
public ZooKeeper connect(String host) throws IOException,InterruptedException {
zoo = new ZooKeeper(host,5000,new Watcher() {
public void process(WatchedEvent we) {
if (we.getState() == KeeperState.SyncConnected) {
connectedSignal.countDown();
}
}
});
connectedSignal.await();
return zoo;
}
// Method to disconnect from zookeeper server
public void close() throws InterruptedException {
zoo.close();
}
}

11.4 创建Znode

zookeeper类提供create方法来创建ZooKeeper ensemble中新的znode。create方法的参数如下:      

          

create(String path, byte[] data, List<ACL> acl, CreateMode createMode)


参数解释说明:

path----------Znode Path。例如,/myapp1,/myapp1/mydata1

data----------在指定znode path存储data

acl-----------被创建node的接入控制列表(ACL)。zookeeper API为了解一些基本的ACL列表提供一个静态接口:ZooDefs.Ids。例如,ZooDefs.Ids.OPEN_ACL_UNSAFE返回打开节点的ACL列表

createMode-----node的类型,要么ephemeral、sequential、both。是枚举类型。

创建一个新的Java应用程序来检查ZooKeeperAPI的create功能。创建一个文件ZKCreate.java。在main方法中,创建一个类型ZooKeeperConnection的对象,并调用connect方法来连接的ZooKeeper ensemble

connect方法将返回ZooKeeper对象ZK。现在,ZK调用对象的自定义路径(path)和数据(data)创建方法。完整的程序代码来创建znode如下:ZKCreate.java 

                                           

import java.io.IOException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
public class ZKCreate {
// create static instance for zookeeper class.
private static ZooKeeper zk;
// create static instance for ZooKeeperConnection class.
private static ZooKeeperConnection conn;
// Method to create znode in zookeeper ensemble
public static void create(String path, byte[] data) throws KeeperException,InterruptedException {
zk.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
public static void main(String[] args)
{
// znode path
String path = "/MyFirstZnode"; // Assign path to znode
// data in byte array
byte[] data = "My first zookeeper app”.getBytes(); // Declare data
try {
conn = new ZooKeeperConnection();
zk = conn.connect("localhost");
create(path, data); // Create the data to the specified path
conn.close();
} catch (Exception e) {
System.out.println(e.getMessage()); //Catch error message
}
}
}


一旦应用程序编译并执行,带有指定数据的znode被创建在zookeeperensemble。用zookeeper CLI zkCli.sh检查它:

 启动客服端:bin/zkCli.shstart

调用get:get /MyFirstZnode

11.5Exists---检查znode是否存在

zookeeper类提供exists方法来检查znode的存在。如果指定的znode存在,它返回一个znode的元数据。该exists方法如下:         

exists(String path, boolean watcher)

参数解释说明:

path-----znode 路径

watcher----布尔值,给指定znode设置watch

创建一个新的Java应用程序来检查zookeeperAPI 的行exists函数。创建一个文件ZKExists.java。在main方法中,创建ZooKeeper的对象, ZK使用ZooKeeperConnection对象。于是,使用带有自定义的path的zk对象调用exist函数。完整的列表如下:ZKExists.java      

    

import java.io.IOException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.data.Stat;
public class ZKExists {
private static ZooKeeper zk;
private static ZooKeeperConnection conn;
// Method to check existence of znode and its status, if znode is available.
public static Stat znode_exists(String path) throws KeeperException,InterruptedException {
return zk.exists(path, true);
}
public static void main(String[] args) throws InterruptedException,KeeperException {
String path= "/MyFirstZnode"; // Assign znode to the specified path
try {
conn = new ZooKeeperConnection();
zk = conn.connect("localhost");
Stat stat = znode_exists(path); // Stat checks the path of the znode
if(stat!= null) {
System.out.println("Node exists and the node version is " + stat.getVersion());
} else {
System.out.println("Node does not exists");
}
}
catch(Exception e) {
System.out.println(e.getMessage()); // Catches error messages
}
}
}


一旦应用程序被编译、执行,输出信息:Nodeexists and the node version is 1.  

11.6 getData方法

zookeeper类提供的getData方法来获取连接在指定znode的数据及其status。GetData方法:

getData(String path, Watcher watcher, Stat stat)


参数解释说明:

 path-------Znode 路径

watcher----Watcher类型的回调函数。zookeeper ensemble将通过回调watcher通知,当指点节点数据变化时。一次通知

 stat--------返回znode的元数据

创建一个新的Java应用程序帮助理解ZooKeeperAPI中getData函数。创建一个文件ZKGetData.java。在main方法中,使用ZooKeeperConnection对象创建一个ZooKeeper对象ZK。然后,ZK对象调用自定义路径GetData方法。

下面是完整的程序代码,以获取从指定的节点的数据:ZKGetData.java 

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.data.Stat;
public class ZKGetData {
private static ZooKeeper zk;
private static ZooKeeperConnection conn;
public static Stat znode_exists(String path) throws KeeperException,InterruptedException {
return zk.exists(path,true);
}
public static void main(String[] args) throws InterruptedException, KeeperException {
String path = "/MyFirstZnode";
final CountDownLatch connectedSignal = new CountDownLatch(1);
try {
conn = new ZooKeeperConnection();
zk = conn.connect("localhost");
Stat stat = znode_exists(path);
if(stat != null) {
byte[] b = zk.getData(path, new Watcher() {
public void process(WatchedEvent we) {
if (we.getType() == Event.EventType.None) {
switch(we.getState()) {
case Expired:
connectedSignal.countDown();
break;
}
} else {
String path = "/MyFirstZnode";
try {
byte[] bn = zk.getData(path, false, null);
String data = new String(bn, "UTF-8");
System.out.println(data);
connectedSignal.countDown();
} catch(Exception ex) {
System.out.println(ex.getMessage());
}
}
}
}, null);
String data = new String(b, "UTF-8");
System.out.println(data);
connectedSignal.await();
} else {
System.out.println("Node does not exists");
}
}
catch(Exception e) {
System.out.println(e.getMessage());
}
}
}

一旦编译并运行,输出:My first zookeeper app。用zookeeper CLIzkCli.sh检查它:

 启动客服端:bin/zkCli.shstart

调用set:set /MyFirstZnode Hello

应用程序输出"Hello"并退出。

 


举报

相关推荐

大数据学习12之HBase

0 条评论