在父节点(/lock)下创建临时顺序节点,节点的次序编号会按照节点的创建时间依次递增,示意图:
Zookeeper分布式锁可以基于这一点特性实现公平锁,以节点的次序编号来决定哪个节点可以获得锁,即可以用次序编号最小的节点来表示获得锁,每个线程在尝试占用锁之前,首先判断自己节点编号是否是当前最小,如果是,则获取锁。
特性二:ZooKeeper的节点监听机制当前 /lock 节点下的每一个已经创建好的子节点都设置为监听上一个节点,比如 2 号节点监听 1 号节点,3 号节点监听 2 号节点…依次循坏。当 1 号节点释放锁之后,2 号节点会监听到此事件并获取锁,同样,当 2 号节点释放锁之后,3 号节点也能收到通知。
羊群效应(惊群效应):
所有的请求(获取锁的请求)都在对同一个节点进行监听,当服务器检测到删除事件时,要通知所有的连接,所有的连接同时收到事件,再次并发竞争。
可以基于ZooKeeper的节点监听机制有效避免羊群效应。
2. 实现原理- zookeeper客户端创建持久根节点/lock
- 创建临时顺序节点,比如/lock/xxx…001
- 查询/lock节点下面所有子节点,然后判断自己的节点是不是排序最小的那个,此时,如果是最小的则会获得锁。如果自己不是最小的,则从所有子节点里面获取比自己次小的一个节点,然后设置监听该节点的事件,然后挂起当前线程。
- 当获取锁的线程执行
package com.jbp.zookeeper;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@Slf4j
public class ZkLock implements Lock, Watcher {
private static final String ZK_LOCK_PATH = "/lock";
private static final String LOCK_PREFIX = ZK_LOCK_PATH + "/";
private static final int LOCK_WAIT_TIME = 1000 * 30;
private ZooKeeper zooKeeper = null;
private String lockName;
private String WAIT_LOCK;
private String CURRENT_LOCK;
private CountDownLatch countDownLatch;
private List exceptionList = new ArrayList<>();
public ZkLock(String zookeeperUrl,String lockName) {
this.lockName = lockName;
try {
// 连接到 zookeeper
ZooKeeper zooKeeper = new ZooKeeper(zookeeperUrl, LOCK_WAIT_TIME, this);
// 判断 ZK分布式锁父节点是否已经存在
Stat flag = zooKeeper.exists(ZK_LOCK_PATH, false);
if (flag == null){
// 父节点不存在,则创建
zooKeeper.create(ZK_LOCK_PATH,new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
}
} catch (IOException | InterruptedException | KeeperException e) {
e.printStackTrace();
}
}
@SneakyThrows
@Override
public void lock() {
if (exceptionList.size() > 0){
throw new RuntimeException(exceptionList.get(0));
}
if (this.tryLock()){
log.info("当前线程:{},和临界资源:{},获得了锁",Thread.currentThread().getName(),lockName);
return;
}else {
// 等待锁
waitForLock(WAIT_LOCK,LOCK_WAIT_TIME);
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
this.lock();
}
@Override
public boolean tryLock() {
String splitStr = "_lock_";
if (lockName.contains(splitStr)){
throw new RuntimeException("锁名有误");
}
// 创建临时有序节点
try {
CURRENT_LOCK = zooKeeper.create(LOCK_PREFIX + lockName + splitStr,new byte[0],ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
log.info("当前锁:{}已经创建",CURRENT_LOCK);
// 取出父节点下所有子节点
List childrenList = zooKeeper.getChildren(ZK_LOCK_PATH, false);
// 取出所有lockName的锁
ArrayList lockNames = new ArrayList<>();
for (String children : childrenList) {
String childrenLockName = children.split(splitStr)[0];
if (childrenLockName.equals(lockName)){
lockNames.add(children);
}
}
// 排序
Collections.sort(lockNames);
log.info("当前线程:{}获取的锁是:{}",Thread.currentThread().getName(),CURRENT_LOCK);
// 若当前节点为最小节点,则获取锁成功
if (CURRENT_LOCK.equals(LOCK_PREFIX + lockNames.get(0))){
return true;
}
// 若不是最小节点,则找到自己的前一个节点
String preChildren = CURRENT_LOCK.substring(CURRENT_LOCK.lastIndexOf("/") + 1);
WAIT_LOCK = lockNames.get(Collections.binarySearch(lockNames,preChildren) - 1);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
return false;
}
@SneakyThrows
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
if (this.tryLock()){
return true;
}
return waitForLock(WAIT_LOCK,LOCK_WAIT_TIME);
}
@Override
public void unlock() {
log.info("释放锁:{}",CURRENT_LOCK);
try {
zooKeeper.delete(CURRENT_LOCK,-1);
CURRENT_LOCK = null;
} catch (InterruptedException | KeeperException e) {
e.printStackTrace();
}finally {
try {
zooKeeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Override
public Condition newCondition() {
return null;
}
@Override
public void process(WatchedEvent watchedEvent) {
if (this.countDownLatch != null){
this.countDownLatch.countDown();
}
}
private boolean waitForLock(String preChildren,long waitTime) throws InterruptedException, KeeperException {
// 判断 ZK分布式锁父节点下的 preChildren 子节点是否已经存在
Stat flag = zooKeeper.exists(LOCK_PREFIX + preChildren, false);
if (flag != null){
log.info("当前线程:{}的节点:{}等待锁",Thread.currentThread().getName(),LOCK_PREFIX + preChildren);
this.countDownLatch = new CountDownLatch(1);
// 计数等待,若等到前一个节点消失,则 precess中进行countDown,停止等待,获取锁
this.countDownLatch.await(waitTime,TimeUnit.MILLISECONDS);
this.countDownLatch = null;
log.info("当前线程:{}获取到了锁",Thread.currentThread().getName());
}
return true;
}
}
4. 基于Curator的可重入锁
InterProcessMutex zkLock = new InterProcessMutex(Curatorframework, "/lock"); // 获取锁 zkLock.acquire(); // 释放锁 zkLock.release();5. 优缺点
优点:
- ZooKeeper分布式锁能有效的解决分布式问题,不可重入问题,使用起来也较为简单。
缺点:
- 单独维护一套Zookeeper集群,频繁的Watch对Zookeeper集群的压力比较大。
- 性能不理想,每次在创建锁和释放锁的过程中,都要动态创建、销毁瞬时节点来实现锁功能(ZK中创建和删除节点只能通过Leader服务器来执行,然后Leader服务器还需要将数据同不到所有的Follower机器上,这样会造成频繁的网络通信)。



