1、实现原理:
1)定义一个根节点作为加锁节点,如果该根节点已经存在,当前会话将在根节点下方创建顺序临时节点(若会话非正常关闭,临时节点也将被删除),线程进入wait()等待状态;
2)每次选择加锁节点,都需要进行排序,选择序号最小的节点进行加锁;
3)如果不是最小节点就监听前一个节点是否存在,进入wait;
4)若加锁节点不存在了,则将所有的线程都notifyAll,所有机器上的线程再次进行争夺锁,直到有一个线程加锁成功,其它线程进入wait;
2、代码流程(借鉴于:ZooKeeper分布式锁实现java例子,附完整可运行源代码_爱码叔(稀有气体)的博客-CSDN博客_java zookeeper 分布式锁)
package com.example.study.practice.zk.lock;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
public class LockSample {
private ZooKeeper zkClient;
private static final String LOCK_ROOT_PATH = "/Locks";
private static final String LOCK_NODE_NAME = "Lock_";
private String lockPath;
// 监控lockPath的前一个节点的watcher
private Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println(event.getPath() + " 前锁释放");
synchronized (this) {
notifyAll();
}
}
};
public LockSample() throws IOException {
zkClient= new ZooKeeper("localhost:2181", 10000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if(event.getState()== Watcher.Event.KeeperState.Disconnected){
System.out.println("失去连接");
}
}
});
}
public void acquireLock() throws InterruptedException, KeeperException {
//创建锁节点
createLock();
//尝试获取锁
attemptLock();
}
private void createLock() throws KeeperException, InterruptedException {
//如果根节点不存在,则创建根节点
Stat stat = zkClient.exists(LOCK_ROOT_PATH, false);
if (stat == null) {
zkClient.create(LOCK_ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
// 创建EPHEMERAL_SEQUENTIAL类型节点
String lockPath = zkClient.create(LOCK_ROOT_PATH + "/" + LOCK_NODE_NAME,
Thread.currentThread().getName().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(Thread.currentThread().getName() + " 锁创建: " + lockPath);
this.lockPath=lockPath;
}
private void attemptLock() throws KeeperException, InterruptedException {
// 获取Lock所有子节点,按照节点序号排序
List lockPaths = null;
lockPaths = zkClient.getChildren(LOCK_ROOT_PATH, false);
Collections.sort(lockPaths);
int index = lockPaths.indexOf(lockPath.substring(LOCK_ROOT_PATH.length() + 1));
// 如果lockPath是序号最小的节点,则获取锁
if (index == 0) {
System.out.println(Thread.currentThread().getName() + " 锁获得, lockPath: " + lockPath);
return ;
} else {
// lockPath不是序号最小的节点,监控前一个节点
String preLockPath = lockPaths.get(index - 1);
Stat stat = zkClient.exists(LOCK_ROOT_PATH + "/" + preLockPath, watcher);
// 假如前一个节点不存在了,比如说执行完毕,或者执行节点掉线,重新获取锁
if (stat == null) {
attemptLock();
} else { // 阻塞当前进程,直到preLockPath释放锁,被watcher观察到,notifyAll后,重新acquireLock
System.out.println(" 等待前锁释放,prelocakPath:"+preLockPath);
synchronized (watcher) {
watcher.wait();
}
attemptLock();
}
}
}
//释放锁的原语实现
public void releaseLock() throws KeeperException, InterruptedException {
zkClient.delete(lockPath, -1);
zkClient.close();
System.out.println(" 锁释放:" + lockPath);
}
}
package com.example.study.practice.zk.lock;
import lombok.SneakyThrows;
import org.apache.zookeeper.KeeperException;
import java.io.IOException;
public class TicketSeller {
private void sell(){
System.out.println("售票开始");
// 线程随机休眠数毫秒,模拟现实中的费时操作
int sleepMillis = (int) (Math.random() * 2000);
try {
//代表复杂逻辑执行了一段时间
Thread.sleep(sleepMillis);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("售票结束");
}
public void sellTicketWithLock() throws KeeperException, InterruptedException, IOException {
LockSample lock = new LockSample();
lock.acquireLock();
sell();
lock.releaseLock();
}
public static void main(String[] args) throws KeeperException, InterruptedException, IOException {
TicketSeller ticketSeller = new TicketSeller();
for(int i=0;i<10;i++){
new Thread(new Runnable() {
@SneakyThrows
@Override
public void run() {
ticketSeller.sellTicketWithLock();
}
}).start();
}
}
}



