栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

三、zookeeper基于javaApi实现zookeeper增删改查以及分布式锁

三、zookeeper基于javaApi实现zookeeper增删改查以及分布式锁

先行步骤

spring boot中引入如下依赖

	
			org.apache.zookeeper
			zookeeper
			3.6.2
		
创建多级节点
  
    @Test
    void createDeepNode() {
        ZooKeeper client = null;
        try {
            client = new ZooKeeper(ipAddress, 50000, null);
            client.create("/updateVideo", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            client.create("/updateVideo/dance", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            client.create("/updateVideo/dance/20201101", "dance dance dance".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
//           client.setData("/更新视频", "这是Data,可以写一些关于业务的参数".getBytes(), -1);

//            client.delete("/更新视频", -1);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                client.close();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }


    }

结果

[zk: localhost:2181(CONNECTED) 0] ls -R /updateVideo
/updateVideo
/updateVideo/dance
/updateVideo/dance/20201101

查看节点是否存在示例
@Test
    void isNodeExist() {
        ZooKeeper client = null;
        try {
            client = new ZooKeeper(ipAddress, 50000, null);
            Stat stat = client.exists("/updateVideo", false);

            logger.info("当前节点是否存在 {}", stat != null ? "存在" : "不存在");
        } catch (Exception e) {

        } finally {
            try {
                client.close();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }


    }
获取结果
 @Test
    void getData() throws Exception {
        ZooKeeper client = null;
        client = new ZooKeeper(ipAddress, 50000, null);
        byte[] data = client.getData("/updateVideo/dance/20201101", false, null);
        logger.info("result {}", new String(data));
        client.close();
    }
获取子节点
  @Test
    void getChildren() throws Exception {
        ZooKeeper client = null;
        client = new ZooKeeper(ipAddress, 50000, null);
        List children = client.getChildren("/updateVideo", false);
        logger.info("result {}", children);
        client.close();
    }
设置订阅 第一种

设置defaultWatcher,无论何种动作都用以此类作为订阅者

@Test
    void register1() throws Exception {
        // 方式1
        ZooKeeper client = new ZooKeeper(ipAddress, 50000, new Watcher() {
            // 这个就是 defaultWatcher 参数,是当前客户端默认的回调实现
            @Override
            public void process(WatchedEvent event) {
                System.out.println("这是本客户端全局的默认回调对象");
            }
        });
// exists
        client.exists("/updateVideo", true);
// getData
        client.getData("/updateVideo/dance", true, null);
// getChildren
        client.getChildren("/updateVideo", true);
        client.close();
    }
第二种(推荐)

在对应操作里面传入对应订阅者对象,这样做可以保证这个类只作为这个路径的回调对象

 @Test
    void register2() throws Exception {
        ZooKeeper client = null;
        client = new ZooKeeper(ipAddress, 50000, null);
// 方式2
// exists
        client.exists("/updateVideo", new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("我是回调对象的实现");
            }
        });
// getData
        client.getData("/updateVideo/dance", new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("我是回调对象的实现");
            }
        }, null);
// getChildren
        client.getChildren("/updateVideo", new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("我是回调对象的实现");
            }
        });

        client.close();

    }
分布式锁

可参考这个开源项目
Curator 框架整合 Spring Boot 操作 ZooKeeper 并使用分布式工具,示范项目

关键代码解释 zk锁配置
package com.github.hellogithub;

import org.apache.curator.framework.Curatorframework;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.zookeeper.config.CuratorframeworkFactoryBean;
import org.springframework.integration.zookeeper.lock.ZookeeperLockRegistry;


@Configuration
public class ZookeeperLockConfiguration {
    @Value("${zookeeper.host:xxxx:2181}")
    private String zkUrl;

    @Bean
    public CuratorframeworkFactoryBean curatorframeworkFactoryBean() {
        return new CuratorframeworkFactoryBean(zkUrl);
    }

    @Bean
    public ZookeeperLockRegistry zookeeperLockRegistry(Curatorframework curatorframework) {

        return new ZookeeperLockRegistry(curatorframework, "/HG-lock");
    }
}

创建两个测试映射

如下文所示,我们先调用http://127.0.0.1:9999/lock10,这个接口会得到lock对象,然后休眠10s。
这时,我们又立刻调用http://127.0.0.1:9999/immediate,这个接口会在/lock10释放锁候得到锁。时间间隔大概10s左右,这就是zk在高并发场景实现分布式锁的示例。

package com.github.hellogithub;

import org.springframework.integration.support.locks.LockRegistry;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;


@RestController
public class TestController {

    @Resource
    private LockRegistry lockRegistry;

    @GetMapping("debug")
    public String dubug() {
        System.out.println(lockRegistry);
        return "SSSS";
    }

    @GetMapping("/lock10")
    public String lock10() {
        System.out.println("lock10 start " + System.currentTimeMillis());
        final Lock lock = lockRegistry.obtain("lock");
        try {
            lock.lock();
            System.out.println("lock10 get lock success " + System.currentTimeMillis());
            TimeUnit.SECONDS.sleep(10);
        } catch (Exception e) {
        } finally {
            lock.unlock();
        }
        return "OK";
    }

    @GetMapping("/immediate")
    public String immediate() {
        System.out.println("immediate start " + System.currentTimeMillis());
        final Lock lock = lockRegistry.obtain("lock");
        try {
            lock.lock();
            System.out.println("immediate get lock success " + System.currentTimeMillis());
        } finally {
            lock.unlock();
        }
        return "immediate return";
    }
}

控制台输出结果
lock10 start 1633186840089
lock10 get lock success 1633186840146
immediate start 1633186841520
immediate get lock success 1633186850213
操作锁定时zk下节点的变化

可以看出,两个映射在取锁时,会先后在zk上创建临时顺序节点,然后分先后顺序完成操作候释放锁,并删除临时节点

[zk: localhost:2181(CONNECTED) 9] ls -R /HG-lock
/HG-lock
/HG-lock/lock
/HG-lock/lock/_c_4861b729-4e90-4c94-bdf1-10a55aec213f-lock-0000000001
/HG-lock/lock/_c_f89f6356-4905-43b2-9a4a-5d4f2af9e7bc-lock-0000000000
[zk: localhost:2181(CONNECTED) 10] ls -R /HG-lock
/HG-lock
/HG-lock/lock

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/285725.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号