学习视频
【尚硅谷】2021新版Zookeeper 3.5.7版本教程
集数:24—28
学习笔记
【Java】学习笔记汇总
一、介绍
什么叫做分布式锁呢?
比如说,"进程1"在使用该资源的时候,会先去获得锁,"进程1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。
二、原生ZooKeeper分布式锁案例手动实现
public class DistributedLock {
private final String connectString = "192.168.150.101:2181";
private final int sessionTimeout = 10000;
private final ZooKeeper zooKeeper;
private CountDownLatch connectLatch = new CountDownLatch(1);
private CountDownLatch waitLatch = new CountDownLatch(1);
private String waitPath;
private String currentNode;
public DistributedLock() throws IOException, InterruptedException, KeeperException {
// 获取连接
zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
// connectLatch 如果连接上zk 可以释放
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
connectLatch.countDown();
}
// waitLatch 需要释放
if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {
waitLatch.countDown();
}
}
});
// 等待zk正常连接后,往下走程序,让zookeeper更健壮
connectLatch.await();
// 判断根节点/locks是否存在
Stat exists = zooKeeper.exists("/locks", false);
if (exists == null) {
// 创建根节点
zooKeeper.create("/locks","locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
// 对zk加锁
public void zkLock() {
// 创建对应的临时带序号节点
try {
currentNode = zooKeeper.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 判断创建的节点是否时最小的序号节点,如果是,获取到锁;如果不是,监听他序号前一个节点
List children = zooKeeper.getChildren("/locks", false);
// 如果children只有一个节点,那就直接获取锁;如果有多个节点,需要判断谁最小
if (children.size() == 1) {
return;
} else {
Collections.sort(children);
// 获取节点名称
// 获取节点名称 seq-00000000
String thisNode = currentNode.substring("/locks/".length());
// 通过seq-00000000获取该节点在children集合的位置
int index = children.indexOf(thisNode);
// 判断
if (index == -1) {
System.out.println("数据异常");
} else if (index == 0) {
// 就一个节点,可以获取锁
return;
} else {
// 需要监听前一个节点
waitPath = "/locks/" + children.get(index - 1);
zooKeeper.getData(waitPath, true, null);
// 等待监听
waitLatch.await();
return;
}
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
// 对zk解锁
public void unZkLock() {
// 删除节点
try {
zooKeeper.delete(currentNode, -1);
} catch (InterruptedException | KeeperException e) {
e.printStackTrace();
}
}
}
测试代码
public class DistributeClient {
private String connectString = "192.168.150.101:2181";
private int sessionTimeout = 10000;
private ZooKeeper zooKeeper;
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
DistributeClient client = new DistributeClient();
// 1 获取连接
client.getConnect();
// 2 监听/servers下面子节点的增加和删除
client.getServerList();
// 3 业务逻辑(睡觉)
client.businesss();
}
private void getServerList() throws InterruptedException, KeeperException {
List children = zooKeeper.getChildren("/servers", true);
ArrayList servers = new ArrayList<>();
for (String child : children) {
byte[] data = zooKeeper.getData("/servers/" + child, false, null);
servers.add(new String(data));
}
System.out.println(servers);
}
private void businesss() throws InterruptedException {
Thread.sleep(Long.MAX_VALUE);
}
private void getConnect() throws IOException {
zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
// 调用getServerList,可以保证持续监听
try {
getServerList();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
});
}
}
三、Curator框架实现分布式锁案例
3.1 概述
原生的 Java API开发存在的问题
1 会话连接是异步的,需要自己去处理。比如使用 CountDownLatch
2 Watch需要重复注册,不然就不能生效
3 开发的复杂性还是比较高的
4 不支持多节点删除和创建。需要自己去递归
Curator是一个专门解决分布式锁的框架,解决了原生 Java API开发分布式遇到的问题。
详情请查看官方文档:https://curator.apache.org/index.html
3.2 Curator案例实操 添加依赖
org.apache.curator curator-framework 4.3.0 org.apache.curator curator-recipes 4.3.0 org.apache.curator curator-client 4.3.0
代码实现
public class CuratorLockTest {
public static void main(String[] args) {
// 创建分布式锁1
InterProcessMutex lock1 = new InterProcessMutex(getCuratorframework(), "/locks");
// 创建分布式锁2
InterProcessMutex lock2 = new InterProcessMutex(getCuratorframework(), "/locks");
new Thread(new Runnable() {
@Override
public void run() {
try {
lock1.acquire();
System.out.println("thread 1 get lock");
lock1.acquire();
System.out.println("thread 1 get lock again");
Thread.sleep(5000);
lock1.release();
System.out.println("thread 1 release lock");
lock1.release();
System.out.println("thread 1 release lock again");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
lock2.acquire();
System.out.println("thread 2 get lock");
lock2.acquire();
System.out.println("thread 2 get lock again");
Thread.sleep(5000);
lock2.release();
System.out.println("thread 2 release lock");
lock2.release();
System.out.println("thread 2 release lock again");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
private static Curatorframework getCuratorframework() {
ExponentialBackoffRetry retry = new ExponentialBackoffRetry(3000, 3);
Curatorframework client = CuratorframeworkFactory.builder()
.connectString("192.168.150.101:2181")
.connectionTimeoutMs(10000)
.retryPolicy(retry).build();
client.start();
System.out.println("zookeeper 启动成功");
return client;
}
}
控台输出:
thread 2 get lock thread 2 get lock again thread 2 release lock thread 2 release lock again thread 1 get lock thread 1 get lock again thread 1 release lock thread 1 release lock again



