分析:
可重入的互斥锁,跨JVM工作。使用ZooKeeper来控制锁。所有JVM中的任何进程,只要使用同样的锁路径,将会成为跨进程的一部分。此外,这个排他锁是“公平的”,每个用户按照申请的顺序得到排他锁。可见InterProcessMutex和我们自己实现的例子都是一个排他锁,此外还可以重入。
代码:
public class Curator_Session {
static AtomicInteger n = new AtomicInteger();
static String lockPath = "/curator_recipe_lock_path";
static Curatorframework client = CuratorframeworkFactory.builder().connectString("127.0.0.1:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
public static void main(String[] args) throws Exception {
client.start();
final InterProcessMutex lock = new InterProcessMutex(client, lockPath);
for (int i = 0; i < 30; i++) {
System.out.println("----------------"+n.addAndGet(1)+"------------------");
new Thread(new Runnable() {
@Override
public void run() {
try {
lock.acquire();
System.out.println("--------------");
} catch (Exception e) {
}
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");
String orderNo = sdf.format(new Date());
System.out.println("---------------生成的订单号:" + orderNo);
try {
lock.release();
} catch (Exception exception) {
}
}
}).start();
}
}
}
运行结果:
---------------生成的订单号:15:46:04|460 ---------------生成的订单号:15:46:04|474 ---------------生成的订单号:15:46:04|477 ---------------生成的订单号:15:46:04|489
逻辑分析:首先通过acquire()获取锁,该方法会阻塞进程,直到获取锁,然后执行你的业务方法,最后通过 release()释放锁。
思路:1、创建有序临时节点
2、触发“尝试取锁逻辑”,如果自己是临时锁节点序列的第一个,则取得锁,获取锁成功。
3、如果自己不是序列中第一个,则监听前一个锁节点变更。同时阻塞线程。
4、当前一个锁节点变更时,通过watcher恢复线程,然后再次到步骤2“尝试取锁逻辑”
源码解析:
InterProcessMutex实现了两个接口:
public class InterProcessMutex implements InterProcessLock, Revocable
InterProcessLock是分布式锁接口,分布式锁必须实现接口中的如下方法:
1、获取锁,直到锁可用
public void acquire() throws Exception;
2、在指定等待的时间内获取锁。
public boolean acquire(long time, TimeUnit unit) throws Exception;
3、释放锁
public void release() throws Exception;
4、当前线程是否获取了锁
boolean isAcquiredInThisProcess();
获取锁:
public void acquire() throws Exception
{
if ( !internalLock(-1, null) )
{
throw new IOException("Lost connection while trying to acquire lock: " + basePath);
}
}
private boolean internalLock(long time, TimeUnit unit) throws Exception
{
获取当前线程
Thread currentThread = Thread.currentThread();
取得当前线程在threadData中的lockData
LockData lockData = threadData.get(currentThread);
if ( lockData != null )
{
// re-entering
如果存在该线程的锁数据,说明是锁重入, lockData.lockCount加1,直接返回true。获取锁成功
lockData.lockCount.incrementAndGet();
return true;
}
如果不存在该线程的锁数据,则通过internals.attemptLock()获取锁,此时线程被阻塞,直至获得到锁
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if ( lockPath != null )
{
锁获取成功后,把锁的信息保存到threadData中。
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
如果没能获取到锁,则返回false。
return false;
}
LockInternals源码分析:Curator通过zk实现分布式锁的核心逻辑都在LockInternals中
在InterProcessMutex获取锁的代码分析中,可以看到它是通过internals.attemptLock(time, unit, getLockNodeBytes());来获取锁的,那么我们就以这个方法为入口。
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{
final long startMillis = System.currentTimeMillis();
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
int retryCount = 0;
String ourPath = null;
boolean hasTheLock = false;
boolean isDone = false;
while ( !isDone )
{
isDone = true;
try
{
通过driver在zk上创建锁节点,获得锁节点路径。
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
通过internalLockLoop()方法阻塞进程,直到获取锁成功
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
}
catch ( KeeperException.NonodeException e )
{
// gets thrown by StandardLockInternalsDriver when it can't find the lock node
// this can happen when the session expires, etc. So, if the retry allows, just try it all again
if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
{
isDone = false;
}
else
{
throw e;
}
}
}
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{
boolean haveTheLock = false;
boolean doDelete = false;
try
{
if ( revocable.get() != null )
{
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}
internalLockLoop中通过while自旋,判断锁如果没有被获取,将不断的去尝试获取锁。
while ( (client.getState() == CuratorframeworkState.STARTED) && !haveTheLock )
{
List children = getSortedChildren();
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
通过driver查看当前锁节点序号是否排在第一位,如果排在第一位,说明取锁成功,跳出循环
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if ( predicateResults.getsTheLock() )
{
haveTheLock = true;
}
else
{
如果没有排在第一位,则监听自己的前序锁节点,然后阻塞线程。
当前序节点释放了锁,监听会被触发,恢复线程,此时主线程又回到while中第一步。
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this)
{
try
{
// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
if ( millisToWait != null )
{
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 )
{
doDelete = true; // timed out - delete our node
break;
}
wait(millisToWait);
}
else
{
wait();
}
}
catch ( KeeperException.NonodeException e )
{
// it has been deleted (i.e. lock released). Try to acquire again
}
}
}
}
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
doDelete = true;
throw e;
}
finally
{
if ( doDelete )
{
deleteOurPath(ourPath);
}
}
return haveTheLock;
}
if ( hasTheLock )
{
return ourPath;
}
return null;
}



