- 一、简介
- 二、maven依赖
- 三、配置类
- 3.1、属性配置文件
- 3.2、属性配置类
- 3.3、ZookeeperConfig配置类(重要)
- 3.4、ZookeeperClient配置类(重要)
- 四、业务编写
- 4.1、抽象类AbstractLock
- 4.2、锁使实现(核心)
- 4.3、controller层
- 五、测试
- 5.1、配置文件application.yml
- 5.2、nginx转发配置
- 5.3、使用jemeter并发测试
- 5.4、测试结果
- 5.5、加锁时间
- 六、InterProcessMutex原理分析
- 6.1、实例化
- 6.1.1、本地调用实例化
- 6.1.2、源码-实例化
- 6.2、获取锁acquire()
- 6.2.1、本地调用获取锁方法
- 6.2.2、源码-internalLock()
- 6.2.3、源码-attemptLock()(重要)
- 6.2.4、源码-createsTheLock()
- 6.2.5、源码-internalLockLoop()(核心)
- 6.2.6、源码-getsTheLock()(重要)
- 6.3、释放锁
- 6.3.1、本地调用释放锁
- 6.3.2、源码-release()(重要)
- 6.3.3、源码-releaseLock
我们知道在JDK 的 java.util.concurrent.locks包中提供了可重入锁,读写锁,及超时获取锁的方法等,但在分布式系统中,当多个应用需要共同操作某一个资源时,就没办法使用JDK里的锁实现了,所以今天的主角就是ZooKeeper + Curator 来完成分布式锁,Curator 提供的四种锁方案:
- InterProcessMutex:分布式可重入排它锁
- InterProcessSemaphoreMutex:分布式排它锁
- InterProcessReadWriteLock:分布式读写锁
- InterProcessMultiLock:将多个锁作为单个实体管理的容器
本文主要介绍可重入排它锁 InterProcessMutex 的相关使用及源码解读。
二、maven依赖pom.xml
三、配置类 3.1、属性配置文件4.0.0 org.springframework.boot spring-boot-starter-parent 2.5.2 com.alian zookeeper-curator 0.0.1-SNAPSHOT zookeeper-curator SpringBoot基于Zookeeper和Curator实现分布式锁 1.8 org.springframework.boot spring-boot-starter-web 2.5.2 org.apache.zookeeper zookeeper 3.6.3 org.slf4j slf4j-log4j12 log4j log4j org.slf4j slf4j-api org.apache.curator curator-framework 5.2.0 org.apache.zookeeper zookeeper org.slf4j slf4j-api org.apache.curator curator-recipes 5.2.0 org.apache.zookeeper zookeeper org.projectlombok lombok 1.16.14 org.springframework.boot spring-boot-maven-plugin
# zookeeper服务器地址(ip+port) zookeeper.server=10.130.3.16:2181 # 休眠时间 zookeeper.sleep-time=1000 # 最大重试次数 zookeeper.max-retries=3 # 会话超时时间 zookeeper.session-timeout=15000 # 连接超时时间 zookeeper.connection-timeout=5000
本机环境有限就不搭建集群了,具体还是在于curator分布式锁的使用及原理。
3.2、属性配置类此配置类不懂的可以参考我另一篇文章:Spring Boot读取配置文件常用方式
ZookeeperProperties.java
package com.alian.zookeepercurator.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Component;
@Data
@Component
@ConfigurationProperties(prefix = "zookeeper")
//读取指定路径配置文件,暂不支持*.yaml文件
@PropertySource(value = "classpath:config/zookeeper.properties", encoding = "UTF-8", ignoreResourceNotFound = true)
public class ZookeeperProperties {
private String server;
private int sleepTime;
private int maxRetries;
private int sessionTimeout;
private int connectionTimeout;
}
3.3、ZookeeperConfig配置类(重要)
ZookeeperConfig.java
此配置类主要是使用Curatorframework来连接zookeeper。
package com.alian.zookeepercurator.config;
import com.alian.zookeepercurator.common.ZookeeperClient;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.CuratorframeworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class ZookeeperConfig {
@Autowired
private ZookeeperProperties zookeeperProperties;
@Bean
public Curatorframework curatorframeworkClient() {
//重试策略,ExponentialBackoffRetry(1000,3)这里表示等待1s重试,最大重试次数为3次
RetryPolicy policy = new ExponentialBackoffRetry(zookeeperProperties.getSleepTime(), zookeeperProperties.getMaxRetries());
//构建Curatorframework实例
Curatorframework curatorframeworkClient = CuratorframeworkFactory
.builder()
.connectString(zookeeperProperties.getServer())
.sessionTimeoutMs(zookeeperProperties.getSessionTimeout())
.connectionTimeoutMs(zookeeperProperties.getConnectionTimeout())
.retryPolicy(policy)
.build();
//启动实例
curatorframeworkClient.start();
return curatorframeworkClient;
}
//采用这种方式注册bean可以比较优雅的关闭连接
@Bean(destroyMethod = "destroy")
public ZookeeperClient zookeeperClient(Curatorframework curatorframeworkClient) {
return new ZookeeperClient(curatorframeworkClient);
}
}
3.4、ZookeeperClient配置类(重要)
ZookeeperClient.java
这个bean是在上面的配置类里定义的,还定义了销毁的方法,这样的好处是,当服务断开后,可以关闭连接,如果直接关闭服务可能会抛出一个异常。使用和其他的使用是一样的,当然如果你为了方便,使用@Component也没有问题。
package com.alian.zookeepercurator.common;
import com.alian.zookeepercurator.lock.AbstractLock;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
@Slf4j
public class ZookeeperClient {
private Curatorframework curatorframework;
public ZookeeperClient(Curatorframework curatorframework) {
this.curatorframework = curatorframework;
}
public T lock(AbstractLock abstractLock) {
//获取锁路径
String lockPath = abstractLock.getLockPath();
//创建InterProcessMutex实例
InterProcessMutex lock = new InterProcessMutex(curatorframework, lockPath); //创建锁对象
boolean success = false;
try {
try {
//加锁
success = lock.acquire(abstractLock.getTime(), abstractLock.getTimeUnit()); //获取锁
} catch (Exception e) {
throw new RuntimeException("obtain lock error " + e.getMessage() + ", lockPath " + lockPath);
}
//判断是否加锁成功
if (success) {
return abstractLock.execute();
} else {
log.info("获取锁失败,返回null");
return null;
}
} finally {
try {
if (success) {
//释放锁
lock.release();
}
} catch (Exception e) {
log.error("release lock error {}, lockPath {}", e.getMessage(), lockPath);
}
}
}
//bean的销毁方法
public void destroy() {
try {
log.info("ZookeeperClient销毁方法,如果zookeeper连接不为空,则关闭连接");
if (getCuratorframework() != null) {
//这种方式比较优雅的关闭连接
getCuratorframework().close();
}
} catch (Exception e) {
log.error("stop zookeeper client error {}", e.getMessage());
}
}
public Curatorframework getCuratorframework() {
return curatorframework;
}
}
四、业务编写
4.1、抽象类AbstractLock
AbstractLock.java
定义一个抽象锁的类,包含锁路径,过期时间及时间单位,子类只需要实现execute方法即可。
package com.alian.zookeepercurator.common; import java.util.concurrent.TimeUnit; public abstract class AbstractLock4.2、锁使实现(核心){ protected String lockPath; protected long time; protected TimeUnit timeUnit; public AbstractLock(String lockPath, long time, TimeUnit timeUnit) { this.lockPath = lockPath; this.time = time; this.timeUnit = timeUnit; } public void setLockPath(String lockPath) { this.lockPath = lockPath; } public String getLockPath() { return lockPath; } public long getTime() { return time; } public void setTime(long time) { this.time = time; } public void setTimeUnit(TimeUnit timeUnit) { this.timeUnit = timeUnit; } public TimeUnit getTimeUnit() { return timeUnit; } public abstract T execute(); }
CuratorLockService.java
package com.alian.zookeepercurator.service;
import com.alian.zookeepercurator.common.ZookeeperClient;
import com.alian.zookeepercurator.common.AbstractLock;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.Curatorframework;
import org.apache.zookeeper.data.Stat;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.concurrent.TimeUnit;
@Slf4j
@Service
public class CuratorLockService {
@Autowired
private ZookeeperClient zookeeperClient;
@Autowired
private Curatorframework curatorframework;
//库存存取的路径
private static final String dataPath = "/root/data/stock";
//初始化库存的路径
private static final String initPath = "/root/init/stock";
@PostConstruct
public void init() {
zookeeperClient.lock(new AbstractLock(initPath, 20, TimeUnit.SECONDS) {
@Override
public Boolean execute() {
try {
//判断是否存在路径
Stat stat = curatorframework.checkExists().forPath(dataPath);
if (stat == null) {
//为空则不存在,则创建并设置库存值
curatorframework.create().forPath(dataPath, "1000".getBytes());
log.info("初始化数据完成");
}
} catch (Exception e) {
e.printStackTrace();
return false;
}
return true;
}
});
}
public String inventoryDeduct(String lockId) {
//我这里是演示,实际对于不同的业务锁路径设置不同,比如支付和订单设置为"/root/pay/"和"/root/order/"
String lockPath = "/root/alian/" + lockId;
//调用加锁方法
Integer result = zookeeperClient.lock(new AbstractLock(lockPath, 10, TimeUnit.SECONDS) {
@Override
public Integer execute() {
try {
//模拟业务处理
byte[] bytes = curatorframework.getData().forPath(dataPath);
String data = new String(bytes);
int stock = Integer.parseInt(data);
if (stock > 0) {
//扣减库存
stock--;
curatorframework.setData().forPath(dataPath, (stock + "").getBytes());
}
return stock;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
});
if (result==null){
log.info("业务执行失败");
return "业务执行失败";
}else {
log.info("执行成功,剩余库存:"+result);
return "执行成功,剩余库存:"+result;
}
}
}
4.3、controller层
CuratorController.java
package com.alian.zookeepercurator.controller;
import com.alian.zookeepercurator.service.CuratorLockService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.http.HttpServletRequest;
@Slf4j
@RequestMapping("/test")
@RestController
public class CuratorController {
@Autowired
private CuratorLockService curatorLockService;
@RequestMapping("/deduct")
public String deduct(HttpServletRequest request) {
String lockId = request.getParameter("lockId");
return curatorLockService.inventoryDeduct(lockId);
}
}
五、测试
5.1、配置文件application.yml
由于我们是windows环境下的本机开发及测试,我们使用idea启动两个实例,端口分别为7080和7081。如果不懂的可以参考我另一篇文章:windows下Nginx配置及负载均衡使用
application.yml
server:
port: 7080
servlet:
context-path: /curator
两个实例启动的示例图:
自定义配置文件localhost_80.conf里server模块里增加转发配置,通过负载均衡到两个实例上。
location ~ ^/curator/ {
proxy_redirect off;
#端口
proxy_set_header Host $host;
#远程地址
proxy_set_header X-Real-IP $remote_addr;
#程序可获取远程ip地址
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
#此处会用的upstream.conf,此文件在nginx.conf已经引入了
proxy_pass http://curator;
}
负载均衡配置upstream.conf文件增加下面的配置,其中zookeeper-id 就是localhost_80.conf文件里配置的http://curator;
upstream zookeeper-id {
server 127.0.0.1:7080 ;
server 127.0.0.1:7081 ;
}
如果你的nginx已经启动,最后记得使用命令nginx -t 检查和nginx -s reload应用。
5.3、使用jemeter并发测试本文中使用使用50个线程请求我的接口获取id,50表示线程数,0表示0秒内一起发送,1表示请求循环的次数。
我们请求的地址是:http://localhost/curator/test/deduct,注意是没有端口的,会通过nginx转发到后台实例。
端口为7081实例的结果:
2021-10-21 17:35:47 168 [http-nio-7081-exec-12] INFO inventoryDeduct 83:执行成功,剩余库存:999 2021-10-21 17:35:47 172 [http-nio-7081-exec-2] INFO inventoryDeduct 83:执行成功,剩余库存:998 2021-10-21 17:35:47 174 [http-nio-7081-exec-9] INFO inventoryDeduct 83:执行成功,剩余库存:997 2021-10-21 17:35:47 176 [http-nio-7081-exec-8] INFO inventoryDeduct 83:执行成功,剩余库存:996 2021-10-21 17:35:47 192 [http-nio-7081-exec-1] INFO inventoryDeduct 83:执行成功,剩余库存:991 2021-10-21 17:35:47 195 [http-nio-7081-exec-3] INFO inventoryDeduct 83:执行成功,剩余库存:990 2021-10-21 17:35:47 201 [http-nio-7081-exec-23] INFO inventoryDeduct 83:执行成功,剩余库存:988 2021-10-21 17:35:47 206 [http-nio-7081-exec-20] INFO inventoryDeduct 83:执行成功,剩余库存:986 2021-10-21 17:35:47 209 [http-nio-7081-exec-16] INFO inventoryDeduct 83:执行成功,剩余库存:985 2021-10-21 17:35:47 216 [http-nio-7081-exec-18] INFO inventoryDeduct 83:执行成功,剩余库存:983 2021-10-21 17:35:47 221 [http-nio-7081-exec-15] INFO inventoryDeduct 83:执行成功,剩余库存:981 2021-10-21 17:35:47 225 [http-nio-7081-exec-14] INFO inventoryDeduct 83:执行成功,剩余库存:980 2021-10-21 17:35:47 228 [http-nio-7081-exec-25] INFO inventoryDeduct 83:执行成功,剩余库存:979 2021-10-21 17:35:47 235 [http-nio-7081-exec-10] INFO inventoryDeduct 83:执行成功,剩余库存:976 2021-10-21 17:35:47 242 [http-nio-7081-exec-21] INFO inventoryDeduct 83:执行成功,剩余库存:973 2021-10-21 17:35:47 246 [http-nio-7081-exec-5] INFO inventoryDeduct 83:执行成功,剩余库存:971 2021-10-21 17:35:47 248 [http-nio-7081-exec-6] INFO inventoryDeduct 83:执行成功,剩余库存:970 2021-10-21 17:35:47 255 [http-nio-7081-exec-24] INFO inventoryDeduct 83:执行成功,剩余库存:967 2021-10-21 17:35:47 261 [http-nio-7081-exec-7] INFO inventoryDeduct 83:执行成功,剩余库存:964 2021-10-21 17:35:47 263 [http-nio-7081-exec-22] INFO inventoryDeduct 83:执行成功,剩余库存:963 2021-10-21 17:35:47 265 [http-nio-7081-exec-4] INFO inventoryDeduct 83:执行成功,剩余库存:962 2021-10-21 17:35:47 272 [http-nio-7081-exec-19] INFO inventoryDeduct 83:执行成功,剩余库存:959 2021-10-21 17:35:47 276 [http-nio-7081-exec-11] INFO inventoryDeduct 83:执行成功,剩余库存:957 2021-10-21 17:35:47 280 [http-nio-7081-exec-13] INFO inventoryDeduct 83:执行成功,剩余库存:955 2021-10-21 17:35:47 283 [http-nio-7081-exec-17] INFO inventoryDeduct 83:执行成功,剩余库存:954
端口为7080实例的结果:
2021-10-21 17:35:47 183 [http-nio-7080-exec-15] INFO inventoryDeduct 83:执行成功,剩余库存:995 2021-10-21 17:35:47 186 [http-nio-7080-exec-21] INFO inventoryDeduct 83:执行成功,剩余库存:994 2021-10-21 17:35:47 188 [http-nio-7080-exec-20] INFO inventoryDeduct 83:执行成功,剩余库存:993 2021-10-21 17:35:47 190 [http-nio-7080-exec-6] INFO inventoryDeduct 83:执行成功,剩余库存:992 2021-10-21 17:35:47 197 [http-nio-7080-exec-17] INFO inventoryDeduct 83:执行成功,剩余库存:989 2021-10-21 17:35:47 203 [http-nio-7080-exec-4] INFO inventoryDeduct 83:执行成功,剩余库存:987 2021-10-21 17:35:47 212 [http-nio-7080-exec-3] INFO inventoryDeduct 83:执行成功,剩余库存:984 2021-10-21 17:35:47 218 [http-nio-7080-exec-16] INFO inventoryDeduct 83:执行成功,剩余库存:982 2021-10-21 17:35:47 230 [http-nio-7080-exec-19] INFO inventoryDeduct 83:执行成功,剩余库存:978 2021-10-21 17:35:47 232 [http-nio-7080-exec-24] INFO inventoryDeduct 83:执行成功,剩余库存:977 2021-10-21 17:35:47 237 [http-nio-7080-exec-23] INFO inventoryDeduct 83:执行成功,剩余库存:975 2021-10-21 17:35:47 239 [http-nio-7080-exec-8] INFO inventoryDeduct 83:执行成功,剩余库存:974 2021-10-21 17:35:47 244 [http-nio-7080-exec-5] INFO inventoryDeduct 83:执行成功,剩余库存:972 2021-10-21 17:35:47 251 [http-nio-7080-exec-1] INFO inventoryDeduct 83:执行成功,剩余库存:969 2021-10-21 17:35:47 253 [http-nio-7080-exec-10] INFO inventoryDeduct 83:执行成功,剩余库存:968 2021-10-21 17:35:47 257 [http-nio-7080-exec-12] INFO inventoryDeduct 83:执行成功,剩余库存:966 2021-10-21 17:35:47 259 [http-nio-7080-exec-11] INFO inventoryDeduct 83:执行成功,剩余库存:965 2021-10-21 17:35:47 268 [http-nio-7080-exec-25] INFO inventoryDeduct 83:执行成功,剩余库存:961 2021-10-21 17:35:47 270 [http-nio-7080-exec-22] INFO inventoryDeduct 83:执行成功,剩余库存:960 2021-10-21 17:35:47 274 [http-nio-7080-exec-14] INFO inventoryDeduct 83:执行成功,剩余库存:958 2021-10-21 17:35:47 278 [http-nio-7080-exec-9] INFO inventoryDeduct 83:执行成功,剩余库存:956 2021-10-21 17:35:47 285 [http-nio-7080-exec-2] INFO inventoryDeduct 83:执行成功,剩余库存:953 2021-10-21 17:35:47 288 [http-nio-7080-exec-18] INFO inventoryDeduct 83:执行成功,剩余库存:952 2021-10-21 17:35:47 290 [http-nio-7080-exec-7] INFO inventoryDeduct 83:执行成功,剩余库存:951 2021-10-21 17:35:47 292 [http-nio-7080-exec-13] INFO inventoryDeduct 83:执行成功,剩余库存:950
从结果上看来我们的库存正常扣减了1000-50=950,
5.5、加锁时间关于加锁超时的一些说明和举例:假设我们加锁的超时时间是10秒钟,现在有20个并发线程进行库存扣减,每个线程需要执行1秒钟,那么最后我们成功获取到锁的线程只有10个,其他10个线程获取锁失败了,就只会减10个库存,结果如下:
端口为7081实例的结果:
2021-10-21 17:51:15 081 [http-nio-7081-exec-3] INFO inventoryDeduct 83:执行成功,剩余库存:948 2021-10-21 17:51:19 145 [http-nio-7081-exec-1] INFO inventoryDeduct 83:执行成功,剩余库存:944 2021-10-21 17:51:20 162 [http-nio-7081-exec-9] INFO inventoryDeduct 83:执行成功,剩余库存:943 2021-10-21 17:51:21 175 [http-nio-7081-exec-6] INFO inventoryDeduct 83:执行成功,剩余库存:942 2021-10-21 17:51:23 057 [http-nio-7081-exec-2] INFO lock 33:获取锁失败,返回null 2021-10-21 17:51:23 057 [http-nio-7081-exec-5] INFO lock 33:获取锁失败,返回null 2021-10-21 17:51:23 057 [http-nio-7081-exec-7] INFO lock 33:获取锁失败,返回null 2021-10-21 17:51:23 058 [http-nio-7081-exec-7] INFO inventoryDeduct 80:业务执行失败 2021-10-21 17:51:23 058 [http-nio-7081-exec-2] INFO inventoryDeduct 80:业务执行失败 2021-10-21 17:51:23 058 [http-nio-7081-exec-8] INFO lock 33:获取锁失败,返回null 2021-10-21 17:51:23 058 [http-nio-7081-exec-4] INFO lock 33:获取锁失败,返回null 2021-10-21 17:51:23 058 [http-nio-7081-exec-8] INFO inventoryDeduct 80:业务执行失败 2021-10-21 17:51:23 058 [http-nio-7081-exec-4] INFO inventoryDeduct 80:业务执行失败 2021-10-21 17:51:23 058 [http-nio-7081-exec-5] INFO inventoryDeduct 80:业务执行失败 2021-10-21 17:51:23 059 [http-nio-7081-exec-10] INFO lock 33:获取锁失败,返回null 2021-10-21 17:51:23 060 [http-nio-7081-exec-10] INFO inventoryDeduct 80:业务执行失败
端口为7080实例的结果:
2021-10-21 17:51:14 069 [http-nio-7080-exec-10] INFO inventoryDeduct 83:执行成功,剩余库存:949 2021-10-21 17:51:16 100 [http-nio-7080-exec-1] INFO inventoryDeduct 83:执行成功,剩余库存:947 2021-10-21 17:51:17 118 [http-nio-7080-exec-8] INFO inventoryDeduct 83:执行成功,剩余库存:946 2021-10-21 17:51:18 131 [http-nio-7080-exec-3] INFO inventoryDeduct 83:执行成功,剩余库存:945 2021-10-21 17:51:22 193 [http-nio-7080-exec-5] INFO inventoryDeduct 83:执行成功,剩余库存:941 2021-10-21 17:51:23 056 [http-nio-7080-exec-7] INFO lock 33:获取锁失败,返回null 2021-10-21 17:51:23 056 [http-nio-7080-exec-7] INFO inventoryDeduct 80:业务执行失败 2021-10-21 17:51:23 057 [http-nio-7080-exec-9] INFO lock 33:获取锁失败,返回null 2021-10-21 17:51:23 057 [http-nio-7080-exec-9] INFO inventoryDeduct 80:业务执行失败 2021-10-21 17:51:23 057 [http-nio-7080-exec-6] INFO lock 33:获取锁失败,返回null 2021-10-21 17:51:23 057 [http-nio-7080-exec-2] INFO lock 33:获取锁失败,返回null 2021-10-21 17:51:23 057 [http-nio-7080-exec-6] INFO inventoryDeduct 80:业务执行失败 2021-10-21 17:51:23 057 [http-nio-7080-exec-2] INFO inventoryDeduct 80:业务执行失败 2021-10-21 17:51:23 208 [http-nio-7080-exec-4] INFO inventoryDeduct 83:执行成功,剩余库存:940六、InterProcessMutex原理分析
首先声明下我这里的zookeeper服务的版本3.6.3,zookeeper版本为3.6.3(jar版本),curator版本为5.2.0。
6.1、实例化 6.1.1、本地调用实例化传入Curatorframework和锁路径即可。
//创建InterProcessMutex实例:public InterProcessMutex(Curatorframework client, String path) InterProcessMutex lock = new InterProcessMutex(curatorframework, lockPath);6.1.2、源码-实例化
首先是我们的实例化方法,这个过程中最重要的就是构建LockInternals对象,这个也是整个锁的最核心的实现。
所在的类的具体路径:org.apache.curator.framework.recipes.locks.InterProcessMutex
//锁的名字
private static final String LOCK_NAME = "lock-";
public InterProcessMutex(Curatorframework client, String path) {
this(client, path, new StandardLockInternalsDriver());
}
public InterProcessMutex(Curatorframework client, String path, LockInternalsDriver driver) {
this(client, path, LOCK_NAME, 1, driver);
}
InterProcessMutex(Curatorframework client, String path, String lockName, int maxLeases, LockInternalsDriver driver) {
//校验路径
basePath = PathUtils.validatePath(path);
//实例化LockInternals(所有申请锁与释放锁的核心实现)
internals = new LockInternals(client, driver, path, lockName, maxLeases);
}
6.2、获取锁acquire()
6.2.1、本地调用获取锁方法
传入过期时间和时间单位即可。
//获取锁:public boolean acquire(long time, TimeUnit unit) boolean success = lock.acquire(time, timeUnit);6.2.2、源码-internalLock()
@Override
public boolean acquire(long time, TimeUnit unit) throws Exception {
return internalLock(time, unit);
}
private boolean internalLock(long time, TimeUnit unit) throws Exception {
Thread currentThread = Thread.currentThread();
//关于并发性的说明:给定的lockData实例只能由单个线程执行,因此无需锁定
LockData lockData = threadData.get(currentThread);
if ( lockData != null )
{
// re-entering,说明是同一个线程:可重入锁
lockData.lockCount.incrementAndGet();
return true;
}
//核心方法:
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if ( lockPath != null ) {
//路径不为空则获取到锁
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
//未获取到锁
return false;
}
从上面源码可以知道,此方法中会根据线程号获取数据,如果获取到了说明是同一个线程再次进入了,也就是可重入锁,没有获取到则开始调用核心方法attemptLock去获取锁,此方法会返回锁路径,如果锁路径不为空则表示获取到了锁,并把数据放到线程中,否则表示未获取到锁
6.2.3、源码-attemptLock()(重要)此方法在另一个核心类:org.apache.curator.framework.recipes.locks.LockInternals
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
final long startMillis = System.currentTimeMillis();
//把过期时间转为毫秒数
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
//传入的lockNodeBytes是null
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
//尝试次数
int retryCount = 0;
//锁路径
String ourPath = null;
//是否拥有锁,一般是针对该线程
boolean hasTheLock = false;
//是否完成
boolean isDone = false;
while (!isDone) {
//默认会完成,如果异常则未完成
isDone = true;
try {
//此处的driver就是实例化时的StandardLockInternalsDriver,创建一个临时序列节点
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
//判断当前节点是否获取到了锁
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
} catch (KeeperException.NoNodeException e) {
// 当StandardLockInternalsDriver找不到锁节点时抛出异常
//这可能发生在会话到期时,等等。因此,如果重试允许,请重试
if (client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {
//标志为完成
isDone = false;
} else {
throw e;
}
}
}
//判断是否拥有锁
if (hasTheLock) {
//获取到了锁则返回路径
return ourPath;
}
return null;
}
完成标志isDone默认为false,只要未完成则循环,先创建临时顺序节点,然后判断当前节点是否获取到了锁,如果获取了则isDone置为true,并返回路径。
6.2.4、源码-createsTheLock()接下里我们分析下:String ourPath = driver.createsTheLock(client, path, localLockNodeBytes);此方法的具体实现在org.apache.curator.framework.recipes.locks.StandardLockInternalsDriver
@Override
public String createsTheLock(Curatorframework client, String path, byte[] lockNodeBytes) throws Exception {
String ourPath;
if (lockNodeBytes != null) {
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
} else {
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
}
return ourPath;
}
6.2.5、源码-internalLockLoop()(核心)
接下里我们分析下:boolean hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);此方法的实现在org.apache.curator.framework.recipes.locks.LockInternals
// private final WatcherRemoveCuratorframework client; //也就是对对象引用进行原子级操作,线程安全的 private final AtomicReferencerevocable = new AtomicReference (null); //curator监听器 private final CuratorWatcher revocableWatcher = new CuratorWatcher() //判断自身是否能够持有锁。如果不能,进入wait,等待被唤醒 private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception { boolean haveTheLock = false; boolean doDelete = false; try { if (revocable.get() != null) { //使用CuratorWatcher监听 client.getData().usingWatcher(revocableWatcher).forPath(ourPath); } //客户端状态是启动,并且未获取到锁 while ((client.getState() == CuratorframeworkState.STARTED) && !haveTheLock) { //获取子节点并排序 List children = getSortedChildren(); //就是截取生成路径中的最后一个节点,根据最后一个"/",进行截取 String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash //判断是否可以持有锁(当前创建的节点是否在上一步获取到的子节点列表的首位) //如果是,说明可以持有锁,那么封装PredicateResults里getsTheLock = true。 //如果不是,说明有其他线程早已先持有了锁,那么封装PredicateResults里getsTheLock = false, //此处还需要获取到自己前一个临时节点的名称pathToWatch PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases); if (predicateResults.getsTheLock()) { haveTheLock = true; } else { //未获取到锁,得到前一个节点的完整路径 String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch(); synchronized (this) { try { // 使用getData()而不是exists(),以避免留下不必要的观察者,这是一种资源泄漏 //添加监听,监听的是前一个节点 client.getData().usingWatcher(watcher).forPath(previousSequencePath); //判断过期时间 if (millisToWait != null) { //过期时间不为空 millisToWait -= (System.currentTimeMillis() - startMillis); startMillis = System.currentTimeMillis(); //判断剩余等待时间,如果小于等于0则置标志位为true,用于删除节点 if (millisToWait <= 0) { doDelete = true; // timed out - delete our node break; } //等待响应时间 wait(millisToWait); } else { //过期时间为空,则一直等待 wait(); } } catch (KeeperException.NoNodeException e) { // it has been deleted (i.e. lock released). Try to acquire again } } } } } catch (Exception e) { ThreadUtils.checkInterrupted(e); doDelete = true; throw e; } finally { if (doDelete) { //获取到了锁,或者是抛出异常了,删除节点 deleteOurPath(ourPath); } } return haveTheLock; }
当客户端状态是启动,并且未获取到锁,进入循环,获取子节点列表并按升序排序,得到当前节点的节点名,然后判断当前节点是否已经获取到到了锁,如果没有获取到则获取它前一个节点名称,然后通过同步方式监听该节点,如果有过期时间则进行响应时间的等待,如果没有则一直等待。如若已经获取到锁则删除相应的节点,这样下一个监听该节点的监听器可以收到通知。
6.2.6、源码-getsTheLock()(重要) @Override
public PredicateResults getsTheLock(Curatorframework client, List children, String sequenceNodeName, int maxLeases) throws Exception {
//获取当前节点的索引
int ourIndex = children.indexOf(sequenceNodeName);
//校验索引号
validateOurIndex(sequenceNodeName, ourIndex);
//maxLeases为1,如果获取到时0表示获取到了锁
boolean getsTheLock = ourIndex < maxLeases;
//如果未获取到锁则获取前一个节点
String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
//把结果封装到PredicateResults里
return new PredicateResults(pathToWatch, getsTheLock);
}
每一段代码都看懂了,我相信大家也就懂了整个的加锁的逻辑了。
6.3、释放锁 6.3.1、本地调用释放锁lock.release();6.3.2、源码-release()(重要)
此方法的实现类在:org.apache.curator.framework.recipes.locks.InterProcessMutex
@Override
public void release() throws Exception {
//关于并发性的说明:给定的lockData实例,只能由单个线程执行,因此无需锁定
Thread currentThread = Thread.currentThread();
InterProcessMutex.LockData lockData = threadData.get(currentThread);
if (lockData == null) {
throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
}
//可重入计次数减1,因为可重入锁的原因,
int newLockCount = lockData.lockCount.decrementAndGet();
//判断次数
if (newLockCount > 0) {
//大于则什么都不做,不能释放锁,只是重入锁的处理
return;
}
if (newLockCount < 0) {
throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
}
try {
//释放锁
internals.releaseLock(lockData.lockPath);
} finally {
//移除线程数据
threadData.remove(currentThread);
}
}
如果调用线程与获取互斥锁的线程相同,则执行一次互斥锁释放。如线程已多次调用acquire,当此方法返回时,互斥锁仍将保持。
6.3.3、源码-releaseLock final void releaseLock(String lockPath) throws Exception {
//移除监听
client.removeWatchers();
//把加锁时加入的数据清除
revocable.set(null);
//删除节点数据
deleteOurPath(lockPath);
}
private void deleteOurPath(String ourPath) throws Exception {
try {
client.delete().guaranteed().forPath(ourPath);
} catch ( KeeperException.NoNodeException e ) {
// ignore - already deleted (possibly expired session, etc.)
}
}
加锁和释放锁的代码已经解释的很清楚了,也不是很复杂,我们也可以根据这个思路自己实现加锁和释放锁,后续我们就写一个吧。



