zookeeper 2和3系列正在画图,就先把代码实现的部分发表出来,后面会补上2-3系列的。如需转载请标注一下备注
文章目录
zookeeper-4一、zk实现分布式锁
zk锁问题的解决zk分布式锁的代码实现
curator对zk的api做了封装,直接使用curator,初始化zk锁实现对zk代码进行封装 zk的watch实现
zk的简单节点操作watch的封装 总结
一、zk实现分布式锁 zk要实现分布式锁,首先需要了解分布式锁实现的几个问题:
- 争抢锁-(只有一个人获得了锁)获取锁的那个人出现了问题?获取锁的人成功抢到了锁-》锁释放锁释放了删除了,别人怎么知道
zk锁和redis锁虽然都是分布式锁,但是本质上是不一样的,例如redis锁的实现的几个问题:
- 争抢锁 - (只有一个人抢到了)锁没有过期时间,业务挂了,锁还继续?设置锁超时时间,时间超时了但业务没执行完?设置较长的过期时间,但业务挂了,锁还继续?锁的时间多长合适?
这些都是加锁带来的问题。ZK锁的几个问题在这里会给出讲解,而redis锁的问题在下个系列redis中会讲解出来。
zk锁问题的解决获取锁的那个人出现了问题?
解决:在zk中znode是有几个特点,第一章的时候讲过,znode包含(持久节点,持久序列节点,临时节点,临时序列节点)在这里可以使用znode的临时节点。当加锁的人出现了问题,会断开和zk的链接,(临时节点特点是:当client断开链接后,临时节点消失)临时节点的消失,是不是就意味着锁消失了。
那么zk锁和redis锁一对比大家是不是就发现,在redis里面你需要对锁设置时间,还需要额外开启安全线程对锁进行监控。例如业务没执行完需要增加锁的超时时间等。成本就上去了。
zk锁释放了,别人怎么知道?三种方案
(1)主动轮训,心跳 。 弊端:延迟,多台机器对zookeeper压力
(2)zk的watch来实现,可以解决延迟问题。 弊端:多台机器对zookeeper有压力
(3)利用序列节点+watch 来实现,watch谁?每个序列节点watch前面一个(这里可以理解为队列,每个队列的后面一个watch前面一个目标),最小的一个获得锁,一旦最小的释放了锁,后面一个就会根据watch来获取锁,这样就不用队列所有都对节点watch。成本:zk只给第二个发事件回调。减少了zookeeper压力
下面展示一些 代码片。倒入maven中zk的jar
curator对zk的api做了封装,直接使用curator,初始化org.apache.curator curator-recipes org.apache.zookeeper zookeeper org.apache.curator curator-framework org.apache.zookeeper zookeeper
@Bean(name = "curatorframework")
public Curatorframework curatorframework(ZkProperties zk) throws Exception {
//构建 Curatorframework
Curatorframework curatorframework =
CuratorframeworkFactory.builder()
//连接地址集群用,隔开
.connectString(zk.getConnectIps())
//连接创建超时时间,单位毫秒
.connectionTimeoutMs(zk.getConnectionTimeOutMs())
//会话超时时间
.sessionTimeoutMs(zk.getSessionTimeOutMs())
//配置zookeeper连接的重试策略
.retryPolicy(
new ExponentialBackoffRetry(
//每次重试时间间隔,单位毫秒
//重试次数
zk.getSleepTimeMs(),zk.getMaxRetries()
))
//设置命名空间 在操作节点的时候,会以这个为父节点
.namespace("fm-zk")
.build();
curatorframework.start();
return curatorframework;
}
zk锁实现
//通过 InterProcessMutex 该类来获取可重入共性锁
InterProcessMutex lock = new InterProcessMutex(
(Curatorframework) SpringContext.getBean("curatorframework"), path
);
try {
lock.acquire();
//这里业务逻辑实现 xxxxx
//xxxxxxxxxx
} catch (Exception e) {
throw new ServerRuntimeException(e);
} finally {
if (lock.isAcquiredInThisProcess()) {
try {
lock.release();
} catch (Exception e) {
throw new ServerRuntimeException(e, "zookeeper 释放锁失败");
}
}
}
好了上述zk锁就实现成功了,是不是觉得很简单。但是看上去并不完美,每次都要去获取curator然后加锁释放,我们可以对这块代码进行一下封装。
对zk代码进行封装首先我们定义两个函数式接口。
@FunctionalInterface
public interface ZkLockBack {
Object lockRun();
}
@FunctionalInterface
public interface ZkLockRunnable {
void lockRun();
}
下面封装zk锁
public class Zk {
public static void lock(ZkLockRunnable runnable, String path) {
CheckUtil.check(ExceptionCodeConst.LEMON_PARAMETER_ERROR, path, "锁地址");
InterProcessMutex lock = new InterProcessMutex(
(Curatorframework) SpringContext.getBean("curatorframework"), path
);
try {
lock.acquire();
runnable.lockRun();
} catch (Exception e) {
throw new ServerRuntimeException(e);
} finally {
if (lock.isAcquiredInThisProcess()) {
try {
lock.release();
} catch (Exception e) {
throw new ServerRuntimeException(e, "zookeeper 释放锁失败");
}
}
}
}
public static Object lockBack(ZkLockBack runnable, String path) {
CheckUtil.check(ExceptionCodeConst.LEMON_PARAMETER_ERROR, path, "锁地址");
InterProcessMutex lock = new InterProcessMutex(
(Curatorframework) SpringContext.getBean("curatorframework"), path
);
try {
lock.acquire();
return runnable.lockRun();
} catch (Exception e) {
throw new ServerRuntimeException(e);
} finally {
if (lock.isAcquiredInThisProcess()) {
try {
lock.release();
} catch (Exception e) {
throw new ServerRuntimeException(e, "zookeeper 释放锁失败");
}
}
}
}
}
调用代码实现:
public static void main(String[] args) {
//无返回
Zk.lock(()->{
//这里是业务逻辑
} , "/path");
//带返回
int aa = (int) Zk.lockBack(()->{
//这里是业务逻辑
return 0;
},"/path/aa");
}
这样调用起来是不是既简单又简洁。
zk的watch实现zk的watch是什么前面已经讲的很清楚。如果还不明白的可以去前面看看理论知识
首先我们还是准备几个interface对watch进行操作
public interface ZkWatchChildRunnable {
void create(Object event);
void update(Object event);
void delete(Object event);
}
@FunctionalInterface
public interface ZkWatchRunnable {
void watch(Object event);
}
zk的简单节点操作
public class ZkModify {
public static boolean isNode(String path) {
CheckUtil.check(ExceptionCodeConst.LEMON_PARAMETER_ERROR, path, "zk-path");
try {
Stat stat = getClient().checkExists().forPath(path);
return stat != null ? true : false;
} catch (Exception e) {
logger.info("zk是否存在节点, 出现异常:{}", e);
return false;
}
}
public static boolean create(String path, String data) {
CheckUtil.check(ExceptionCodeConst.LEMON_PARAMETER_ERROR, path, "zk-path");
CheckUtil.check(ExceptionCodeConst.LEMON_PARAMETER_ERROR, data, "zk-data");
try {
if (!isNode(path)) {
getClient()
.create()
.withMode(CreateMode.EPHEMERAL)
.forPath(path, data.getBytes());
} else {
//重新设置值进去
getClient().setData().forPath(path, data.getBytes());
}
return true;
} catch (Exception e) {
logger.info("zk创建节点 , 出现异常:{}", e);
return false;
}
}
public static boolean create(String path, String data, CreateMode createMode) {
CheckUtil.check(ExceptionCodeConst.LEMON_PARAMETER_ERROR, path, "zk-path");
CheckUtil.check(ExceptionCodeConst.LEMON_PARAMETER_ERROR, data, "zk-data");
try {
if (!isNode(path)) {
getClient()
.create()
.withMode(createMode)
.forPath(path, data.getBytes());
} else {
//重新设置值进去
getClient().setData().forPath(path, data.getBytes());
}
return true;
} catch (Exception e) {
logger.info("zk创建节点 , 出现异常:{}", e);
return false;
}
}
}
public static boolean delete(String path) {
CheckUtil.check(ExceptionCodeConst.LEMON_PARAMETER_ERROR, path, "zk-path");
try {
getClient().delete().guaranteed().forPath(path);
return true;
} catch (Exception e) {
logger.info("zk删除一个节点 , 出现异常:{}", e);
return false;
}
}
public static boolean deleteAll(String path) {
CheckUtil.check(ExceptionCodeConst.LEMON_PARAMETER_ERROR, path, "zk-path");
try {
getClient().delete().deletingChildrenIfNeeded().forPath(path);
return true;
} catch (Exception e) {
logger.info("zk删除一个节点,并且递归删除其所有的子节点, 出现异常:{}", e);
return false;
}
}
public static String get(String path) {
CheckUtil.check(ExceptionCodeConst.LEMON_PARAMETER_ERROR, path, "zk-path");
try {
byte[] bytes = getClient().getData().forPath(path);
return new String(bytes);
} catch (Exception e) {
logger.info("zk获取节点数据, 出现异常:{}", e);
return null;
}
}
代码可根据自己所需要的封装,并不一定非要和作者一样。
watch的封装 zk的watch使用一次后会消失,如果需要反复监听需要重新注册进去
public static void zkWatch(ZkWatchRunnable zkWatch, String path, boolean rest) throws Exception {
CheckUtil.check(ExceptionCodeConst.LEMON_PARAMETER_ERROR, path, "zk-path");
if (!isNode(path)) {
getClient().getData().usingWatcher(new org.apache.zookeeper.Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
zkWatch.watch(watchedEvent);
if (rest)
//是否再次注册监听
getClient().getData().usingWatcher(this);
}
}).forPath(path);
}
}
public static void zkWatch(ZkWatchRunnable zkWatch, String path, boolean rest, String data, CreateMode createMode) throws Exception {
CheckUtil.check(ExceptionCodeConst.LEMON_PARAMETER_ERROR, path, "zk-path");
if (create(path, data, createMode)) {
getClient().getData().usingWatcher(new org.apache.zookeeper.Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
zkWatch.watch(watchedEvent);
if (rest)
getClient().getData().usingWatcher(this);
}
}).forPath(path);
}
}
public static void zkWatchPathChildrenCache(ZkWatchChildRunnable zkWatch, String path) throws Exception {
CheckUtil.check(ExceptionCodeConst.LEMON_PARAMETER_ERROR, path, "zk-path");
if (!isNode(path)) {
try {
PathChildrenCache pathChildrenCache =
new PathChildrenCache(getClient(), path, true);
pathChildrenCache.getListenable().addListener((client, event) -> {
ChildData childData = event.getData();
switch (event.getType()) {
case CHILD_ADDED:
zkWatch.create(childData);
break;
case CHILD_UPDATED:
zkWatch.update(childData);
break;
case CHILD_REMOVED:
zkWatch.delete(childData);
break;
default:
break;
}
});
pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
} catch (Exception e) {
e.printStackTrace();
}
}
}
代码调用和上面一样,都是函数式接口,使用lamdba来简化代码,watch部分代码就在上面,只写了几个例子。具体拿到了watchEvent之后,程序需要干啥,就是业务方面的事情了。
总结zk是一款强大的工具,他的设计目的是为了(最终一致性,可靠性,实时性,等待无关 ,原子性,顺序性)该框架保证了分布式环境中数据的强一致性,也正是基 于这样的特性,使得zookeeper能够应用于很多场景。
代码还得一步一步来,下一系列讲解,redis。redis是个超麻烦的东西,要讲的太多,估计要分十几篇吧,有得更新了。
ps作者这么努力的封装更新各位看官能是否多给点赞和收藏?



