栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Springboot使用Curator 集成zk

Springboot使用Curator 集成zk

前言

这篇文章主要基于客户端在日常工作中使用zk的角度来写,前面南国已经就zk的理论知识做过一些讲述,对此不太熟悉的可以往前看看。Curator是Apache提供的一个zk的工具包,简化了 ZooKeeper 的操作。它增加了很多使用 ZooKeeper 开发的特性,可以处理 ZooKeeper 集群复杂的连接管理和重试机制。这里我们使用springboot 集成curator来操作zk。
假设你的服务已经正确添加了zk的相关依赖,Curator maveny依赖如下,


    org.apache.curator
    curator-framework
    4.2.0
    
        
            org.apache.zookeeper
            zookeeper
        
    



	org.apache.curator
	curator-recipes
	4.2.0
	
		
			org.apache.curator
			curator-framework
		
	

正文

Curator 功能分两大类,一类是对 ZooKeeper 的一些基本命令的封装,比如增删改查,即 framework 模块;另一类是他的高级特性,即 Recipes 模块。

创建Curatorframework

Curator 框架通过 CuratorframeworkFactory 可以通过工厂模式或者builder 模式创建 Curatorframework 实例。Curatorframework 实例都是线程安全的,你应该在你的应用中共享同一个 Curatorframework 实例。
在springboot中相当于创建一个curator的配置类

@Slf4j
@Configuration
public class ZkCoreClient {
    // zk 服务端集群地址
    @Value("${zk.url}")
    private String zkUrl;

    // session 超时时间
    private int timeOut = 60000;

    // zkclient 重试间隔时间
    private int baseSleepTimeMs = 5000;

    //zkclient 重试次数
    private int retryCount = 5;


    
    @Bean
    public Curatorframework init() {
        Curatorframework client = CuratorframeworkFactory
                .builder()
                .connectString(zkUrl)
                .sessionTimeoutMs(timeOut)
                .retryPolicy(new ExponentialBackoffRetry(baseSleepTimeMs, retryCount))
//                            .namespace(appName)
                .build();
        // 或者使用工厂模式
//                    client = CuratorframeworkFactory.newClient(zkUrl,new ExponentialBackoffRetry(baseSleepTimeMs,retryCount)).usingNamespace(appName);
        client.start();
        log.info("client is created at ================== {}", LocalDateTime.now());

        return client;
    }

}
查看curator的基本用法

@Slf4j
public class ZkUtils {
    @Autowired
    Curatorframework client;

    

    public String createNode(String path, String value) throws Exception {
        return createNode(path, value, true);
    }

    public String createNode(String path, String value, Boolean isEphemeral) throws Exception {
        if (null == client) {
            throw new RuntimeException("there is not connect to zkServer...");
        }
        String node = client
                .create()
                .creatingParentsIfNeeded()
                .withMode(isEphemeral.equals(true) ? CreateMode.EPHEMERAL_SEQUENTIAL : CreateMode.PERSISTENT_SEQUENTIAL) // 临时顺序节点/持久顺序节点
                .forPath(path, value.getBytes());

        log.info("create node : {}", node);
        return node;
    }

    
    public void deleteNode(String path) throws Exception {
        if (null == client) {
            throw new RuntimeException("there is not connect to zkServer...");
        }
        client.delete()
                .guaranteed() // 保障机制,若未删除成功,只要会话有效会在后台一直尝试删除
                .deletingChildrenIfNeeded() // 若当前节点包含子节点,子节点也删除
                .forPath(path);
        log.info("{} is deleted ", path);
    }

    
    public Stat isExists(String path) throws Exception {
        if (null == client) {
            throw new RuntimeException("there is not connect to zkServer...");
        }
        return client.checkExists().forPath(path);
    }

    
    public List getChildren(String path) throws Exception {
        if (null == client) {
            throw new RuntimeException("there is not connect to zkServer...");
        }
        return client.getChildren()
                .forPath(path);
    }


    
    public String getNodeData(String path) throws Exception {
        if (null == client) {
            throw new RuntimeException("there is not connect to zkServer...");
        }
        Stat stat = new Stat();
        byte[] bytes = client.getData().storingStatIn(stat).forPath(path);
        log.info("{} data is : {}", path, new String(bytes));
        log.info("current stat version is {}, createTime is {}", stat.getVersion(), stat.getCtime());
        return new String(bytes);
    }


    
    public void setNodeData(String path, String value) throws Exception {
        if (null == client) {
            throw new RuntimeException("there is not connect to zkServer...");
        }
        Stat stat = client.checkExists().forPath(path);
        if (null == stat) {
            log.info(String.format("{} Znode is not exists", path));
            throw new RuntimeException(String.format("{} Znode is not exists", path));
        }
        String nodeData = getNodeData(path);
        client.setData().withVersion(stat.getVersion()).forPath(path, value.getBytes());
        log.info("{} Znode data is set. old vaule is {}, new data is {}", path, nodeData, value);
    }


    
    public void addWatcherWithNodeCache(String path) throws Exception {
        if (null == client) {
            throw new RuntimeException("there is not connect to zkServer...");
        }
        // dataIsCompressed if true, data in the path is compressed
        NodeCache nodeCache = new NodeCache(client, path, false);
        NodeCacheListener listener = () -> {
            ChildData currentData = nodeCache.getCurrentData();
            log.info("{} Znode data is chagnge,new data is ---  {}", currentData.getPath(), new String(currentData.getData()));
        };
        nodeCache.getListenable().addListener(listener);
        nodeCache.start();
    }


    
    public void addWatcherWithChildCache(String path) throws Exception {
        if (null == client) {
            throw new RuntimeException("there is not connect to zkServer...");
        }
        //cacheData if true, node contents are cached in addition to the stat
        PathChildrenCache pathChildrenCache = new PathChildrenCache(client, path, false);
        PathChildrenCacheListener listener = (client, event) -> {
            log.info("event path is --{} ,event type is {}", event.getData().getPath(), event.getType());
        };
        pathChildrenCache.getListenable().addListener(listener);
        // StartMode : NORMAL  BUILD_INITIAL_CACHE  POST_INITIALIZED_EVENT
        // NORMAL:异步初始化, BUILD_INITIAL_CACHE:同步初始化, POST_INITIALIZED_EVENT:异步初始化,初始化之后会触发事件
        pathChildrenCache.start(PathChildrenCache.StartMode.NORMAL);
    }

    
    public void addWatcherWithTreeCache(String path) throws Exception {
        if (null == client) {
            throw new RuntimeException("there is not connect to zkServer...");
        }
        TreeCache treeCache = new TreeCache(client, path);
        TreeCacheListener listener = (client, event) -> {
            log.info("节点路径 --{} ,节点事件类型: {} , 节点值为: {}", Objects.nonNull(event.getData()) ? event.getData().getPath() : "无数据", event.getType());
        };
        treeCache.getListenable().addListener(listener);
        treeCache.start();
    }

}

Curator Recipes的使用

Recipes 模块主要有 Elections (选举)、Locks (锁)、Barriers (关卡)、Atomic (原子量)、Caches、Queues 等

Electtions 选举

选举主要依赖于 LeaderSelector 和 LeaderLatch 两个类。前者是所有存活的客户端不间断的轮流做 Leader。后者是一旦选举出 Leader,除非有客户端挂掉重新触发选举,否则不会交出领导权。这两者在实现上是可以切换的。

Locks(分布式锁)

curator 提供了InterProcessMutex来实现zk的分布式锁,他用acquire获取锁 release释放锁。

ZooKeeper分布式锁:
优点
ZooKeeper分布式锁(如InterProcessMutex),能有效地解决分布式问题,不可重入问题,使用起来也较为简单
缺点
ZooKeeper实现的分布式锁,性能并不太高。
因为每次在创建锁和释放锁的过程中,都要动态创建、销毁暂时节点来实现锁功能,
Zk中创建和删除节点只能通过Leader(主)服务器来执行,然后Leader服务器还需要将数据同步到所有的Follower(从)服务器上,这样频繁的网络通信,系统性能会下降。

总之,在高性能、高并发的应用场景下,不建议使用ZooKeeper的分布式锁,如果在并发量不是太高的应用场景中,还是推荐使用ZooKeeper的分布式锁。
目前分布式锁,比较成熟、主流的方案有两种:

基于Redis的分布式锁。适用于并发量很大、性能要求很高而可靠性问题可以通过其他方案去弥补的场景。
基于ZooKeeper的分布式锁。适用于高可靠,而并发量不是太高的场景
在选型时,选择适合于自己业务场景的方案即可。

简单的demo如下

@Autowired
    Curatorframework client;

    
    public  TwoTuple tryLock(LockCallback lockCallback, String lockKey, Long timeout) {
        InterProcessMutex lock = new InterProcessMutex(client, lockKey);
        try {
            if (lock.acquire(timeout, TimeUnit.MILLISECONDS)) {
                log.info(Thread.currentThread().getName() + " get lock");
                return new TwoTuple<>(true, lockCallback.exec());
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                log.info(Thread.currentThread().getName() + " release lock");
                lock.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        return new TwoTuple<>(false, null);
    }
服务注册发现

maven 添加依赖

        
            org.apache.curator
            curator-x-discovery
            4.2.0
        
    public void registerService(String serviceName, String... urls) throws Exception {
        ServiceInstanceBuilder serviceInstanceBuilder = ServiceInstance.builder();
        serviceInstanceBuilder.address(InetAddress.getLocalHost().getHostAddress());
        serviceInstanceBuilder.port(Integer.parseInt(environment.getProperty("server.port")));
        serviceInstanceBuilder.name(serviceName);
        Map config = new HashMap();
        config.put("url", urls);
        serviceInstanceBuilder.payload(config);

        ServiceInstance serviceInstance = serviceInstanceBuilder.build();
        ServiceDiscovery serviceDiscovery = ServiceDiscoveryBuilder.builder(Map.class)
                .client(client).serializer(new JsonInstanceSerializer(Map.class))
                .basePath(SERVICE_ROOT_PATH).build();

        serviceDiscovery.registerService(serviceInstance);
        serviceDiscovery.start();
    }

    
    public void discovery(String serviceName) {
        try {
            ServiceDiscovery serviceDiscovery = ServiceDiscoveryBuilder.builder(Map.class)
                    .client(client).basePath(SERVICE_ROOT_PATH).build();
            serviceDiscovery.start();
            //根据名称获取服务
            Collection> services = serviceDiscovery.queryForInstances(serviceName);
            for (ServiceInstance service : services) {
                System.out.print(service.getPayload() + " -- ");
                System.out.println(service.getAddress() + ":" + service.getPort());
            }
            System.out.println();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

参考资料:
https://blog.csdn.net/smartbetter/article/details/53083816
https://www.icode9.com/content-1-116250.html

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/762169.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号