栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

java 使用 curator 的 API 操作 zookeeper

java 使用 curator 的 API 操作 zookeeper

依赖


    org.apache.curator
    curator-recipes
    5.2.0



    org.junit.jupiter
    junit-jupiter-api
    5.8.2
    test

public class TestCuratorService {

    private String zookeeperConnectionString = "127.0.0.1:2181";

    private Curatorframework client;

    
    @Before
    public void initClient1() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        client = CuratorframeworkFactory.newClient(zookeeperConnectionString, retryPolicy);
        client.start();
    }

    
    public void initClient2() {
        ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 3);
        client = CuratorframeworkFactory.builder()
        		//zookeeper 的连接地址
                .connectString(zookeeperConnectionString)
                .connectionTimeoutMs(10000)
                .sessionTimeoutMs(10000)
                .retryPolicy(retry).build();
        client.start();
    }

    @Test
    public void dataIsExists() throws Exception {
        String path = "/test/aaaa";
        Stat stat = client.checkExists()
                .usingWatcher(new Watcher() {
                    @Override
                    public void process(WatchedEvent watchedEvent) {
                        System.out.println("监听回调");
                    }
                })
                .forPath(path);
        System.out.println(stat);
    }

    @Test
    public void createPath() throws Exception {
        String path = "/test/aaaa";
        String res = client.create()
                // 递归创建path
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                .forPath(path, "path数据内容".getBytes());
        System.out.println(res);
    }

    @Test
    public void deletePath() throws Exception {
        String path = "/test/aaaa";
        client.delete()
                // 强制删除
                .guaranteed()
                // 递归删除
                .deletingChildrenIfNeeded()
                //.withVersion(-1)
                .forPath(path);
    }

    @Test
    public void getPath() throws Exception {
        String path = "/test";
        CountDownLatch countDownLatch = new CountDownLatch(1);
        List childPaths = client.getChildren()
                .usingWatcher(new Watcher() {
                    @Override
                    public void process(WatchedEvent watchedEvent) {
                        System.out.println("监听回调");
                    }
                })
                // inBackground会导致forPath的返回为null
                .inBackground(new BackgroundCallback() {
                    @Override
                    public void processResult(Curatorframework curatorframework, CuratorEvent curatorEvent) {
                        System.out.println("异步操作");
                        List pathList = curatorframework.getChildren().forPath(path);
                        System.out.println(pathList);
                        countDownLatch.countDown();
                    }
                })
                .forPath(path);
        countDownLatch.await();
    }

    @Test
    public void getData() throws Exception {
        String path = "/test";
        byte[] bytes = client.getData()
                .usingWatcher(new Watcher() {
                    @Override
                    public void process(WatchedEvent watchedEvent) {
                        System.out.println("监听回调");
                    }
                })
                
                .forPath(path);
        String data = new String(bytes);
        System.out.println(data);
    }

    @Test
    public void updateData() throws Exception {
        String path = "/test";
        client.setData().forPath(path, "数据内容".getBytes());
    }

    
    @Test
    public void transaction() throws Exception {
        //创建事务
        CuratorOp curatorOp = client.transactionOp().delete().forPath("/test/aaa");
        //执行事务
        List results = client.transaction().forOperations(curatorOp);
    }

    
    @Test
    public void zookeeperLock() throws Exception {
        String lockPath = "/test/lock";
        InterProcessMutex lock = new InterProcessMutex(client, lockPath);
        // 加锁
        if (lock.acquire(5, TimeUnit.SECONDS)) {
            try {
                // 业务操作
            } finally {
                // 解锁
                lock.release();
            }

        }
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/735430.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号