热门IT资讯网

zookeeper Java api

发表于:2024-11-25 作者:热门IT资讯网编辑
编辑最后更新 2024年11月25日,maven依赖: 4.0.0 per.ym zk 0.0.1-SNAPSHOT org.apache.zookeeper zookeeper 3.4.

maven依赖:

  4.0.0  per.ym  zk  0.0.1-SNAPSHOT            org.apache.zookeeper      zookeeper      3.4.12                junit        junit        4.12        test      

测试类:

package per.ym.zookeeper;import java.io.IOException;import java.util.List;import java.util.concurrent.CountDownLatch;import org.apache.zookeeper.AsyncCallback.ChildrenCallback;import org.apache.zookeeper.AsyncCallback.DataCallback;import org.apache.zookeeper.AsyncCallback.StatCallback;import org.apache.zookeeper.AsyncCallback.StringCallback;import org.apache.zookeeper.AsyncCallback.VoidCallback;import org.apache.zookeeper.KeeperException.Code;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.Watcher.Event.EventType;import org.apache.zookeeper.ZooDefs.Ids;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.data.Stat;import org.junit.After;import org.junit.Before;import org.junit.Test;public class ZkTest implements Watcher{    private ZooKeeper zk;    private CountDownLatch cdl;    private String path = "/test";    private String rootPath = "/";    private int sessionTimeOut = 15000;    private byte[] data = "data".getBytes();    private byte[] newData = "newData".getBytes();    @Before    public void connect() throws IOException {        zk = new ZooKeeper("192.168.61.131:2184", sessionTimeOut, this);        cdl = new CountDownLatch(1);    }    //同步调用    @Test    public void testSync( ) throws Exception {        //等待与zookeeper服务端连接完成        cdl.await();        //创建一个持久节点/test,并为其赋值为data        zk.create("/test", data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);        //获取根节点下的子节点        List children = zk.getChildren("/", this);        System.out.println("根节点下的子节点有: " + children);        //查询节点/test保存的数据        Stat stat = new Stat();        byte[] tempData = zk.getData(path, this, stat);        System.out.println("test节点数据为: " + new String(tempData));        //设置节点/test的数据为newData        zk.setData(path, newData, -1);        tempData = zk.getData(path, this, stat);        System.out.println("test节点新数据为: " + new String(tempData));        //删除节点/test        zk.delete(path, -1);        //判断节点/test是否存在        stat = zk.exists(path, this);        System.out.println(stat);    }    //创建节点后进行回调    private StringCallback cb = new StringCallback() {        @Override        public void proce***esult(int rc, String path, Object ctx, String name) {            switch (Code.get(rc)) {                 case CONNECTIONLOSS:                    //如果失去连接,我们无法保证是在创建前还是创建后丢失的,因此重试                    create();                    break;                case OK:                    System.out.println("节点/test创建成功");                    break;                case NODEEXISTS:                    System.out.println("节点/test已经存在");                    break;                default:                    System.out.println(KeeperException.create(Code.get(rc), path));                }        }    };    //获取子节点进行回调    private ChildrenCallback ccb = new ChildrenCallback() {        @Override        public void proce***esult(int rc, String path, Object ctx, List children) {            switch (Code.get(rc)) {                case CONNECTIONLOSS:                    getChildren();                    break;                case OK:                    System.out.println("根节点下的子节点有: " + children);                    break;                default:                    System.out.println(KeeperException.create(Code.get(rc), path));            }        }    };    //获取数据进行回调    private DataCallback dcb = new DataCallback() {        @Override        public void proce***esult(int rc, String path, Object ctx, byte[] data, Stat stat) {            switch (Code.get(rc)) {                case CONNECTIONLOSS:                    getData();                    break;                case OK:                    System.out.println("test节点数据为: " + new String(data));                    break;                default:                    System.out.println(KeeperException.create(Code.get(rc), path));            }        }    };    //设置数据进行回调    private StatCallback scb = new StatCallback() {        @Override        public void proce***esult(int rc, String path, Object ctx, Stat stat) {            switch (Code.get(rc)) {                case CONNECTIONLOSS:                    setData();                    break;                case OK:                    //这个ctx就是我们调用zk.setData时传入的最后一个参数                    System.out.println("test节点设置新数 " + new String((byte[])ctx) + "成功");                    break;                default:                    System.out.println(KeeperException.create(Code.get(rc), path));            }        }    };    //删除节点进行回调    private VoidCallback vcb = new VoidCallback() {        @Override        public void proce***esult(int rc, String path, Object ctx) {            switch (Code.get(rc)) {                case CONNECTIONLOSS:                    delete();                    break;                case OK:                    System.out.println("删除节点/test成功");                    break;                default:                    System.out.println(KeeperException.create(Code.get(rc), path));            }        }    };    //节点是否存在进行回调    private StatCallback scb2 = new StatCallback() {        @Override        public void proce***esult(int rc, String path, Object ctx, Stat stat) {            switch (Code.get(rc)) {                 case CONNECTIONLOSS:                    exists();                    break;                case OK:                    System.out.println("节点/test存在");                    break;                case NONODE:                    System.out.println("节点/test不存在");                    break;                default:                    break;            }        }    };    //异步调用    @Test    public void testAsync() throws Exception {        cdl.await();        create();        getChildren();        setData();        getData();        exists();        delete();        exists();    }    private void create() {        zk.create(path, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, cb, data);    }    private void getChildren() {        zk.getChildren(rootPath, this, ccb, null);    }    private void getData() {        zk.getData(path, this, dcb, null);    }    private void setData() {        zk.setData(path, newData, -1, scb, newData);    }    private void delete() {        zk.delete(path, -1, vcb, null);    }    private void exists() {        zk.exists(path, this, scb2, null);    }    @Override    public void process(WatchedEvent event) {        //如果连接成功,放行        if (event.getType().equals(EventType.None)) {            cdl.countDown();        }        System.out.println(event);    }    @After    public void close() throws InterruptedException {        //测试完成后关闭连接        zk.close();    }}
0