栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

zookeeper-1.3 JAVA分布式锁的实现及watch代码实现封装

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

zookeeper-1.3 JAVA分布式锁的实现及watch代码实现封装

zookeeper-4

zookeeper 2和3系列正在画图,就先把代码实现的部分发表出来,后面会补上2-3系列的。如需转载请标注一下备注


文章目录

zookeeper-4一、zk实现分布式锁

zk锁问题的解决zk分布式锁的代码实现

curator对zk的api做了封装,直接使用curator,初始化zk锁实现对zk代码进行封装 zk的watch实现

zk的简单节点操作watch的封装 总结


一、zk实现分布式锁 zk要实现分布式锁,首先需要了解分布式锁实现的几个问题:
    争抢锁-(只有一个人获得了锁)获取锁的那个人出现了问题?获取锁的人成功抢到了锁-》锁释放锁释放了删除了,别人怎么知道

zk锁和redis锁虽然都是分布式锁,但是本质上是不一样的,例如redis锁的实现的几个问题:

    争抢锁 - (只有一个人抢到了)锁没有过期时间,业务挂了,锁还继续?设置锁超时时间,时间超时了但业务没执行完?设置较长的过期时间,但业务挂了,锁还继续?锁的时间多长合适?

这些都是加锁带来的问题。ZK锁的几个问题在这里会给出讲解,而redis锁的问题在下个系列redis中会讲解出来。

zk锁问题的解决

    获取锁的那个人出现了问题?
    解决:在zk中znode是有几个特点,第一章的时候讲过,znode包含(持久节点,持久序列节点,临时节点,临时序列节点)在这里可以使用znode的临时节点。当加锁的人出现了问题,会断开和zk的链接,(临时节点特点是:当client断开链接后,临时节点消失)临时节点的消失,是不是就意味着锁消失了。
    那么zk锁和redis锁一对比大家是不是就发现,在redis里面你需要对锁设置时间,还需要额外开启安全线程对锁进行监控。例如业务没执行完需要增加锁的超时时间等。成本就上去了。

    zk锁释放了,别人怎么知道?三种方案
    (1)主动轮训,心跳 。 弊端:延迟,多台机器对zookeeper压力
    (2)zk的watch来实现,可以解决延迟问题。 弊端:多台机器对zookeeper有压力
    (3)利用序列节点+watch 来实现,watch谁?每个序列节点watch前面一个(这里可以理解为队列,每个队列的后面一个watch前面一个目标),最小的一个获得锁,一旦最小的释放了锁,后面一个就会根据watch来获取锁,这样就不用队列所有都对节点watch。成本:zk只给第二个发事件回调。减少了zookeeper压力

zk分布式锁的代码实现

下面展示一些 代码片。倒入maven中zk的jar

		
            org.apache.curator
            curator-recipes
            
                
                    org.apache.zookeeper
                    zookeeper
                
            
        

        
            org.apache.curator
            curator-framework
        

        
            org.apache.zookeeper
            zookeeper
        
curator对zk的api做了封装,直接使用curator,初始化
	
    @Bean(name = "curatorframework")
    public Curatorframework curatorframework(ZkProperties zk) throws Exception {
            //构建 Curatorframework
            Curatorframework curatorframework =
                    CuratorframeworkFactory.builder()
                            //连接地址集群用,隔开
                            .connectString(zk.getConnectIps())
                            //连接创建超时时间,单位毫秒
                         .connectionTimeoutMs(zk.getConnectionTimeOutMs())
                            //会话超时时间
                            .sessionTimeoutMs(zk.getSessionTimeOutMs())
                            //配置zookeeper连接的重试策略
                            .retryPolicy(
                                    new ExponentialBackoffRetry(
                                    //每次重试时间间隔,单位毫秒 
                                    //重试次数
                                   zk.getSleepTimeMs(),zk.getMaxRetries()
                                    ))
                            //设置命名空间 在操作节点的时候,会以这个为父节点
                            .namespace("fm-zk")
                            .build();
            curatorframework.start();
            return curatorframework;
    }
zk锁实现
 //通过 InterProcessMutex 该类来获取可重入共性锁
 		InterProcessMutex lock = new InterProcessMutex(
    		(Curatorframework) SpringContext.getBean("curatorframework"), path
        );
		try {
            lock.acquire();
           //这里业务逻辑实现 xxxxx
           //xxxxxxxxxx
        } catch (Exception e) {
            throw new ServerRuntimeException(e);
        } finally {
            if (lock.isAcquiredInThisProcess()) {
                try {
                    lock.release();
                } catch (Exception e) {
                    throw new ServerRuntimeException(e, "zookeeper 释放锁失败");
                }
            }
        }

好了上述zk锁就实现成功了,是不是觉得很简单。但是看上去并不完美,每次都要去获取curator然后加锁释放,我们可以对这块代码进行一下封装。

对zk代码进行封装

首先我们定义两个函数式接口。

@FunctionalInterface
public interface ZkLockBack {
    
    Object lockRun();
}
@FunctionalInterface
public interface ZkLockRunnable {
    
    void lockRun();
}

下面封装zk锁

public class Zk {
    
    public static void lock(ZkLockRunnable runnable, String path) {
        CheckUtil.check(ExceptionCodeConst.LEMON_PARAMETER_ERROR, path, "锁地址");
        InterProcessMutex lock = new InterProcessMutex(
                (Curatorframework) SpringContext.getBean("curatorframework"), path
        );
        try {
            lock.acquire();
            runnable.lockRun();
        } catch (Exception e) {
            throw new ServerRuntimeException(e);
        } finally {
            if (lock.isAcquiredInThisProcess()) {
                try {
                    lock.release();
                } catch (Exception e) {
                    throw new ServerRuntimeException(e, "zookeeper 释放锁失败");
                }
            }
        }
    }


    
    public static Object lockBack(ZkLockBack runnable, String path) {
        CheckUtil.check(ExceptionCodeConst.LEMON_PARAMETER_ERROR, path, "锁地址");
        InterProcessMutex lock = new InterProcessMutex(
                (Curatorframework) SpringContext.getBean("curatorframework"), path
        );
        try {
            lock.acquire();
            return runnable.lockRun();
        } catch (Exception e) {
            throw new ServerRuntimeException(e);
        } finally {
            if (lock.isAcquiredInThisProcess()) {
                try {
                    lock.release();
                } catch (Exception e) {
                    throw new ServerRuntimeException(e, "zookeeper 释放锁失败");
                }
            }
        }
    }
}

调用代码实现:

 public static void main(String[] args) {
        //无返回
        Zk.lock(()->{
            //这里是业务逻辑
            
            
        } , "/path");
        //带返回
       int aa = (int) Zk.lockBack(()->{
           //这里是业务逻辑
           
           
            return 0;
        },"/path/aa");
    }

这样调用起来是不是既简单又简洁。

zk的watch实现

zk的watch是什么前面已经讲的很清楚。如果还不明白的可以去前面看看理论知识
首先我们还是准备几个interface对watch进行操作

public interface ZkWatchChildRunnable {

    void create(Object event);

    void update(Object event);

    void delete(Object event);
}
@FunctionalInterface
public interface ZkWatchRunnable {
    
    void watch(Object event);
}
zk的简单节点操作
public class ZkModify {
    
    public static boolean isNode(String path) {
        CheckUtil.check(ExceptionCodeConst.LEMON_PARAMETER_ERROR, path, "zk-path");
        try {
            Stat stat = getClient().checkExists().forPath(path);
            return stat != null ? true : false;
        } catch (Exception e) {
            logger.info("zk是否存在节点, 出现异常:{}", e);
            return false;
        }
    }
    
 	 
    public static boolean create(String path, String data) {
        CheckUtil.check(ExceptionCodeConst.LEMON_PARAMETER_ERROR, path, "zk-path");
        CheckUtil.check(ExceptionCodeConst.LEMON_PARAMETER_ERROR, data, "zk-data");
        try {
            if (!isNode(path)) {
                getClient()
                        .create()
                        .withMode(CreateMode.EPHEMERAL)
                        .forPath(path, data.getBytes());
            } else {
                //重新设置值进去
                getClient().setData().forPath(path, data.getBytes());
            }
            return true;
        } catch (Exception e) {
            logger.info("zk创建节点 , 出现异常:{}", e);
            return false;
        }
    }
    
 	
    public static boolean create(String path, String data, CreateMode createMode) {
        CheckUtil.check(ExceptionCodeConst.LEMON_PARAMETER_ERROR, path, "zk-path");
        CheckUtil.check(ExceptionCodeConst.LEMON_PARAMETER_ERROR, data, "zk-data");
        try {
            if (!isNode(path)) {
                getClient()
                        .create()
                        .withMode(createMode)
                        .forPath(path, data.getBytes());
            } else {
                //重新设置值进去
                getClient().setData().forPath(path, data.getBytes());
            }
            return true;
        } catch (Exception e) {
            logger.info("zk创建节点 , 出现异常:{}", e);
            return false;
        }
    }
 }
 
    public static boolean delete(String path) {
        CheckUtil.check(ExceptionCodeConst.LEMON_PARAMETER_ERROR, path, "zk-path");
        try {
            getClient().delete().guaranteed().forPath(path);
            return true;
        } catch (Exception e) {
            logger.info("zk删除一个节点 , 出现异常:{}", e);
            return false;
        }
    }

    
    public static boolean deleteAll(String path) {
        CheckUtil.check(ExceptionCodeConst.LEMON_PARAMETER_ERROR, path, "zk-path");
        try {
            getClient().delete().deletingChildrenIfNeeded().forPath(path);
            return true;
        } catch (Exception e) {
            logger.info("zk删除一个节点,并且递归删除其所有的子节点, 出现异常:{}", e);
            return false;
        }
    }

    
    public static String get(String path) {
        CheckUtil.check(ExceptionCodeConst.LEMON_PARAMETER_ERROR, path, "zk-path");
        try {
            byte[] bytes = getClient().getData().forPath(path);
            return new String(bytes);
        } catch (Exception e) {
            logger.info("zk获取节点数据, 出现异常:{}", e);
            return null;
        }
    }

代码可根据自己所需要的封装,并不一定非要和作者一样。

watch的封装
	zk的watch使用一次后会消失,如果需要反复监听需要重新注册进去
	
    public static void zkWatch(ZkWatchRunnable zkWatch, String path, boolean rest) throws Exception {
        CheckUtil.check(ExceptionCodeConst.LEMON_PARAMETER_ERROR, path, "zk-path");
        if (!isNode(path)) {
            getClient().getData().usingWatcher(new org.apache.zookeeper.Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    zkWatch.watch(watchedEvent);
                    if (rest)
                        //是否再次注册监听
                        getClient().getData().usingWatcher(this);
                }
            }).forPath(path);
        }
    }
    
	
public static void zkWatch(ZkWatchRunnable zkWatch, String path, boolean rest, String data, CreateMode createMode) throws Exception {
        CheckUtil.check(ExceptionCodeConst.LEMON_PARAMETER_ERROR, path, "zk-path");
        if (create(path, data, createMode)) {
            getClient().getData().usingWatcher(new org.apache.zookeeper.Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    zkWatch.watch(watchedEvent);
                    if (rest)
                        getClient().getData().usingWatcher(this);
                }
            }).forPath(path);
        }
    }

 
    public static void zkWatchPathChildrenCache(ZkWatchChildRunnable zkWatch, String path) throws Exception {
        CheckUtil.check(ExceptionCodeConst.LEMON_PARAMETER_ERROR, path, "zk-path");
        if (!isNode(path)) {
            try {
                PathChildrenCache pathChildrenCache =
                        new PathChildrenCache(getClient(), path, true);
                pathChildrenCache.getListenable().addListener((client, event) -> {
                    ChildData childData = event.getData();
                    switch (event.getType()) {
                        case CHILD_ADDED:
                            zkWatch.create(childData);
                            break;
                        case CHILD_UPDATED:
                            zkWatch.update(childData);
                            break;
                        case CHILD_REMOVED:
                            zkWatch.delete(childData);
                            break;
                        default:
                            break;
                    }
                });
                pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

代码调用和上面一样,都是函数式接口,使用lamdba来简化代码,watch部分代码就在上面,只写了几个例子。具体拿到了watchEvent之后,程序需要干啥,就是业务方面的事情了。

总结

zk是一款强大的工具,他的设计目的是为了(最终一致性,可靠性,实时性,等待无关 ,原子性,顺序性)该框架保证了分布式环境中数据的强一致性,也正是基 于这样的特性,使得zookeeper能够应用于很多场景。

代码还得一步一步来,下一系列讲解,redis。redis是个超麻烦的东西,要讲的太多,估计要分十几篇吧,有得更新了。


ps作者这么努力的封装更新各位看官能是否多给点赞和收藏?

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/754674.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号