java 使用 curator 的 API 操作 zookeeper

阅读 68

2022-02-10

依赖

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.2.0</version>
</dependency>

<dependency>
    <groupId>org.junit.jupiter</groupId>
    <artifactId>junit-jupiter-api</artifactId>
    <version>5.8.2</version>
    <scope>test</scope>
</dependency>
public class TestCuratorService {

    private String zookeeperConnectionString = "127.0.0.1:2181";

    private CuratorFramework client;

    /**
     * 连接初始化
     * 方式一
     */
    @Before
    public void initClient1() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
        client.start();
    }

    /**
     * 连接初始化
     * 方式二
     */
    public void initClient2() {
        ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 3);
        client = CuratorFrameworkFactory.builder()
        		//zookeeper 的连接地址
                .connectString(zookeeperConnectionString)
                .connectionTimeoutMs(10000)
                .sessionTimeoutMs(10000)
                .retryPolicy(retry).build();
        client.start();
    }

    @Test
    public void dataIsExists() throws Exception {
        String path = "/test/aaaa";
        Stat stat = client.checkExists()
                .usingWatcher(new Watcher() {
                    @Override
                    public void process(WatchedEvent watchedEvent) {
                        System.out.println("监听回调");
                    }
                })
                .forPath(path);
        System.out.println(stat);
    }

    @Test
    public void createPath() throws Exception {
        String path = "/test/aaaa";
        String res = client.create()
                // 递归创建path
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                .forPath(path, "path数据内容".getBytes());
        System.out.println(res);
    }

    @Test
    public void deletePath() throws Exception {
        String path = "/test/aaaa";
        client.delete()
                // 强制删除
                .guaranteed()
                // 递归删除
                .deletingChildrenIfNeeded()
                //.withVersion(-1)
                .forPath(path);
    }

    @Test
    public void getPath() throws Exception {
        String path = "/test";
        CountDownLatch countDownLatch = new CountDownLatch(1);
        List<String> childPaths = client.getChildren()
                .usingWatcher(new Watcher() {
                    @Override
                    public void process(WatchedEvent watchedEvent) {
                        System.out.println("监听回调");
                    }
                })
                // inBackground会导致forPath的返回为null
                .inBackground(new BackgroundCallback() {
                    @Override
                    public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) {
                        System.out.println("异步操作");
                        List<String> pathList = curatorFramework.getChildren().forPath(path);
                        System.out.println(pathList);
                        countDownLatch.countDown();
                    }
                })
                .forPath(path);
        countDownLatch.await();
    }

    @Test
    public void getData() throws Exception {
        String path = "/test";
        byte[] bytes = client.getData()
                .usingWatcher(new Watcher() {
                    @Override
                    public void process(WatchedEvent watchedEvent) {
                        System.out.println("监听回调");
                    }
                })
                /*
                // inBackground会导致forPath的返回为null
                .inBackground(new BackgroundCallback() {
                    @Override
                    public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) {
                        System.out.println("异步操作");
                    }
                })
                */
                .forPath(path);
        String data = new String(bytes);
        System.out.println(data);
    }

    @Test
    public void updateData() throws Exception {
        String path = "/test";
        client.setData().forPath(path, "数据内容".getBytes());
    }

    /**
     * 事务操作
     */
    @Test
    public void transaction() throws Exception {
        //创建事务
        CuratorOp curatorOp = client.transactionOp().delete().forPath("/test/aaa");
        //执行事务
        List<CuratorTransactionResult> results = client.transaction().forOperations(curatorOp);
    }

    /**
     * 分布式锁
     */
    @Test
    public void zookeeperLock() throws Exception {
        String lockPath = "/test/lock";
        InterProcessMutex lock = new InterProcessMutex(client, lockPath);
        // 加锁
        if (lock.acquire(5, TimeUnit.SECONDS)) {
            try {
                // 业务操作
            } finally {
                // 解锁
                lock.release();
            }

        }
    }
}

精彩评论(0)

0 0 举报