实现原理:
利用zookeeper顺序操作串行的特性我们可以将zookeeper作为一个全局的锁来使用.
首先创建一个zookeeper基础操作类:
来对zookeeper节点进行操作
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.*;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.data.Stat;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@Slf4j
public class ZookeeperDistributeLock implements DistributedLock {
private ZooKeeper zooKeeper;
private String rootPath;// 根路径名
private String lockNamePre;// 锁前缀
private static ThreadLocal currentLockPath = new ThreadLocal<>();// 用于保存某个客户端在locker下面创建成功的顺序节点,用于后续相关操作使用(如判断)
private static int MAX_RETRY_COUNT = 10;// 最大重试次数
public ZookeeperDistributeLock(ZooKeeper zookeeper, String rootPath, String lockNamePre) {
log.info("rootPath:{},lockNamePre:{}", rootPath, lockNamePre);
this.zooKeeper = zookeeper;
this.rootPath = rootPath;
this.lockNamePre = lockNamePre;
init();
}
private void init() {
try {
Stat stat = zooKeeper.exists(rootPath, false);// 判断一下根目录是否存在
if (stat == null) {
zooKeeper.create(rootPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (Exception e) {
log.error("create rootPath error", e);
}
}
private String getLockNodeNumber(String str, String lockName) {
int index = str.lastIndexOf(lockName);
if (index >= 0) {
index += lockName.length();
return index <= str.length() ? str.substring(index) : "";
}
return str;
}
private List getSortedChildren() throws Exception {
List children = zooKeeper.getChildren(rootPath, false);
if (children != null && !children.isEmpty()) {
Collections.sort(children, new Comparator() {
@Override
public int compare(String lhs, String rhs) {
return getLockNodeNumber(lhs, lockNamePre).compareTo(getLockNodeNumber(rhs, lockNamePre));
}
});
}
log.info("sort childRen:{}", children);
return children;
}
private boolean waitToLock(long startMillis, Long millisToWait) throws Exception {
boolean haveTheLock = false;
boolean doDelete = false;
try {
while (!haveTheLock) {
log.info("get Lock Begin");
// 该方法实现获取locker节点下的所有顺序节点,并且从小到大排序,
List children = getSortedChildren();
String sequenceNodeName = currentLockPath.get().substring(rootPath.length() + 1);
// 计算刚才客户端创建的顺序节点在locker的所有子节点中排序位置,如果是排序为0,则表示获取到了锁
int ourIndex = children.indexOf(sequenceNodeName);
if (ourIndex < 0) {
log.error("not find node:{}", sequenceNodeName);
throw new Exception("节点没有找到: " + sequenceNodeName);
}
// 如果当前客户端创建的节点在locker子节点列表中位置大于0,表示其它客户端已经获取了锁
// 此时当前客户端需要等待其它客户端释放锁,
boolean isGetTheLock = ourIndex == 0;
// 如何判断其它客户端是否已经释放了锁?从子节点列表中获取到比自己次小的哪个节点,并对其建立监听
String pathToWatch = isGetTheLock ? null : children.get(ourIndex - 1);
if (isGetTheLock) {
log.info("get the lock,currentLockPath:{}", currentLockPath.get());
haveTheLock = true;
} else {
// 如果次小的节点被删除了,则表示当前客户端的节点应该是最小的了,所以使用CountDownLatch来实现等待
String previousSequencePath = rootPath.concat("/").concat(pathToWatch);
final CountDownLatch latch = new CountDownLatch(1);
final Watcher previousListener = new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == EventType.NodeDeleted) {
latch.countDown();
}
}
};
// 如果节点不存在会出现异常
Stat stat = zooKeeper.exists(previousSequencePath, previousListener);
if (stat == null) {
log.error("节点为空,发生异常");
throw new Exception();
}
// 如果有超时时间,刚到超时时间就返回
if (millisToWait != null) {
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if (millisToWait <= 0) {
// timed out - delete our node
doDelete = true;
break;
}
latch.await(millisToWait, TimeUnit.MICROSECONDS);
} else {
latch.await();
}
}
}
} catch (Exception e) {
// 发生异常需要删除节点
log.error("waitToLock exception", e);
doDelete = true;
throw e;
} finally {
// 如果需要删除节点
if (doDelete) {
unLock();
}
}
log.info("get Lock end,haveTheLock=" + haveTheLock);
return haveTheLock;
}
private String createLockNode(String path) throws Exception {
Stat stat = zooKeeper.exists(rootPath, false);
// 判断一下根目录是否存在
if (stat == null) {
//创建永久节点
zooKeeper.create(rootPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
//创建临时节点,客户端断开则删除节点
return zooKeeper.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}
private Boolean attemptLock(long time, TimeUnit unit) throws Exception {
final long startMillis = System.currentTimeMillis();
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
boolean hasTheLock = false;
boolean isDone = false;
int retryCount = 0;
// 网络闪断需要重试一试,最大重试次数MAX_RETRY_COUNT
while (!isDone) {
isDone = true;
try {
currentLockPath.set(createLockNode(rootPath.concat("/").concat(lockNamePre)));
hasTheLock = waitToLock(startMillis, millisToWait);
} catch (Exception e) {
if (retryCount++ < MAX_RETRY_COUNT) {
log.error("正在重试::" + retryCount);
isDone = false;
} else {
throw e;
}
}
}
return hasTheLock;
}
@Override
public boolean tryLock() throws Exception {
log.info("tryLock Lock Begin");
// 该方法实现获取locker节点下的所有顺序节点,并且从小到大排序,
List children = getSortedChildren();
String sequenceNodeName = currentLockPath.get().substring(rootPath.length() + 1);
// 计算刚才客户端创建的顺序节点在locker的所有子节点中排序位置,如果是排序为0,则表示获取到了锁
int ourIndex = children.indexOf(sequenceNodeName);
if (ourIndex < 0) {
log.error("not find node:{}", sequenceNodeName);
throw new Exception("节点没有找到: " + sequenceNodeName);
}
// 如果当前客户端创建的节点在locker子节点列表中位置大于0,表示其它客户端已经获取了锁
return ourIndex == 0;
}
@Override
public void lock() throws Exception {
// -1,null表示阻塞等待,不设置超时时间
attemptLock(-1, null);
}
@Override
public boolean lock(long time, TimeUnit unit) throws Exception {
if (time <= 0) {
throw new Exception("Lock wait for time must greater than 0");
}
if (unit == null) {
throw new Exception("TimeUnit can not be null");
}
return attemptLock(time, unit);
}
@Override
public void unLock() {
try {
Stat stat = zooKeeper.exists(currentLockPath.get(), true);
if (stat != null) {
zooKeeper.delete(currentLockPath.get(), -1);
}
} catch (Exception e) {
log.error("unLock error", e);
} finally {
currentLockPath.remove();
}
}
}
使用方法也很简单,在程序中直接调用lock方法即可上锁,再次调用unlock方法即可释放全局锁
@Around("lockPointCut()")
public Object tryLock(ProceedingJoinPoint pjd) {
Object result;
String methodName = pjd.getSignature().getName();
//执行目标方法
try {
//前置通知
log.info("进入分布式锁切面方法:" + methodName);
//加锁
myLock.lock();
log.info("加锁成功当前线程:"+Thread.currentThread().getName());
//执行方法体
result = pjd.proceed();
} catch (Throwable throwable) {
log.info("锁切面失效");
myLock.unLock();
throwable.printStackTrace();
return new CommonResult().failed("加锁方法执行失败");
} finally {
log.info("释放zookeeper锁");
myLock.unLock();
}
return result;
}



